YARN-5333. Some recovered apps are put into default queue when RM HA. Contributed by Jun Gong.

(cherry picked from commit d9a354c2f39274b2810144d1ae133201e44e3bfc)
This commit is contained in:
Rohith Sharma K S 2016-08-05 21:35:49 +05:30
parent 6863866127
commit 69da77c357
2 changed files with 142 additions and 45 deletions

View File

@ -47,7 +47,6 @@
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
@ -319,15 +318,7 @@ public synchronized void transitionToActive(
UserGroupInformation user = checkAccess("transitionToActive"); UserGroupInformation user = checkAccess("transitionToActive");
checkHaStateChange(reqInfo); checkHaStateChange(reqInfo);
try {
rm.transitionToActive();
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
"", "RM",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
try { try {
// call all refresh*s for active RM to get the updated configurations. // call all refresh*s for active RM to get the updated configurations.
refreshAll(); refreshAll();
@ -337,10 +328,22 @@ public synchronized void transitionToActive(
.getDispatcher() .getDispatcher()
.getEventHandler() .getEventHandler()
.handle( .handle(
new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e)); new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED,
e));
throw new ServiceFailedException( throw new ServiceFailedException(
"Error on refreshAll during transistion to Active", e); "Error on refreshAll during transition to Active", e);
} }
try {
rm.transitionToActive();
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
"", "RM",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
"RM"); "RM");
} }
@ -408,12 +411,7 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
RefreshQueuesResponse response = RefreshQueuesResponse response =
recordFactory.newRecordInstance(RefreshQueuesResponse.class); recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try { try {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); refreshQueues();
// refresh the reservation system
ReservationSystem rSystem = rmContext.getReservationSystem();
if (rSystem != null) {
rSystem.reinitialize(getConfig(), rmContext);
}
RMAuditLogger.logSuccess(user.getShortUserName(), operation, RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService"); "AdminService");
return response; return response;
@ -422,6 +420,15 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
} }
} }
private void refreshQueues() throws IOException, YarnException {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
// refresh the reservation system
ReservationSystem rSystem = rmContext.getReservationSystem();
if (rSystem != null) {
rSystem.reinitialize(getConfig(), rmContext);
}
}
@Override @Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException, StandbyException { throws YarnException, StandbyException {
@ -454,6 +461,13 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
} }
} }
private void refreshNodes() throws IOException, YarnException {
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getNodesListManager().refreshNodes(conf);
}
@Override @Override
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request) RefreshSuperUserGroupsConfigurationRequest request)
@ -464,6 +478,16 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu
checkRMStatus(user.getShortUserName(), operation, checkRMStatus(user.getShortUserName(), operation,
"refresh super-user-groups."); "refresh super-user-groups.");
refreshSuperUserGroupsConfiguration();
RMAuditLogger.logSuccess(user.getShortUserName(),
operation, "AdminService");
return recordFactory.newRecordInstance(
RefreshSuperUserGroupsConfigurationResponse.class);
}
private void refreshSuperUserGroupsConfiguration()
throws IOException, YarnException {
// Accept hadoop common configs in core-site.xml as well as RM specific // Accept hadoop common configs in core-site.xml as well as RM specific
// configurations in yarn-site.xml // configurations in yarn-site.xml
Configuration conf = Configuration conf =
@ -472,11 +496,6 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
RMServerUtils.processRMProxyUsersConf(conf); RMServerUtils.processRMProxyUsersConf(conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
RMAuditLogger.logSuccess(user.getShortUserName(),
operation, "AdminService");
return recordFactory.newRecordInstance(
RefreshSuperUserGroupsConfigurationResponse.class);
} }
@Override @Override
@ -488,10 +507,7 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
checkRMStatus(user.getShortUserName(), operation, "refresh user-groups."); checkRMStatus(user.getShortUserName(), operation, "refresh user-groups.");
Groups.getUserToGroupsMappingService( refreshUserToGroupsMappings();
getConfiguration(new Configuration(false),
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
RMAuditLogger.logSuccess(user.getShortUserName(), operation, RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService"); "AdminService");
@ -499,6 +515,12 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsResponse.class); RefreshUserToGroupsMappingsResponse.class);
} }
private void refreshUserToGroupsMappings() throws IOException, YarnException {
Groups.getUserToGroupsMappingService(
getConfiguration(new Configuration(false),
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
}
@Override @Override
public RefreshAdminAclsResponse refreshAdminAcls( public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException { RefreshAdminAclsRequest request) throws YarnException, IOException {
@ -541,6 +563,14 @@ public RefreshServiceAclsResponse refreshServiceAcls(
checkRMStatus(user.getShortUserName(), operation, "refresh Service ACLs."); checkRMStatus(user.getShortUserName(), operation, "refresh Service ACLs.");
refreshServiceAcls();
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
private void refreshServiceAcls() throws IOException, YarnException {
PolicyProvider policyProvider = RMPolicyProvider.getInstance(); PolicyProvider policyProvider = RMPolicyProvider.getInstance();
Configuration conf = Configuration conf =
getConfiguration(new Configuration(false), getConfiguration(new Configuration(false),
@ -552,11 +582,6 @@ public RefreshServiceAclsResponse refreshServiceAcls(
conf, policyProvider); conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls( rmContext.getResourceTrackerService().refreshServiceAcls(
conf, policyProvider); conf, policyProvider);
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
} }
private synchronized void refreshServiceAcls(Configuration configuration, private synchronized void refreshServiceAcls(Configuration configuration,
@ -691,18 +716,17 @@ private synchronized Configuration getConfiguration(Configuration conf,
@VisibleForTesting @VisibleForTesting
void refreshAll() throws ServiceFailedException { void refreshAll() throws ServiceFailedException {
try { try {
refreshQueues(RefreshQueuesRequest.newInstance()); checkAcls("refreshAll");
refreshNodes(RefreshNodesRequest.newInstance(DecommissionType.NORMAL)); refreshQueues();
refreshSuperUserGroupsConfiguration( refreshNodes();
RefreshSuperUserGroupsConfigurationRequest.newInstance()); refreshSuperUserGroupsConfiguration();
refreshUserToGroupsMappings( refreshUserToGroupsMappings();
RefreshUserToGroupsMappingsRequest.newInstance());
if (getConfig().getBoolean( if (getConfig().getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) { false)) {
refreshServiceAcls(RefreshServiceAclsRequest.newInstance()); refreshServiceAcls();
} }
refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest.newInstance()); refreshClusterMaxPriority();
} catch (Exception ex) { } catch (Exception ex) {
throw new ServiceFailedException(ex.getMessage()); throw new ServiceFailedException(ex.getMessage());
} }
@ -839,11 +863,7 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
checkRMStatus(user.getShortUserName(), operation, msg); checkRMStatus(user.getShortUserName(), operation, msg);
try { try {
Configuration conf = refreshClusterMaxPriority();
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getScheduler().setClusterMaxPriority(conf);
RMAuditLogger RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService"); .logSuccess(user.getShortUserName(), operation, "AdminService");
@ -854,6 +874,14 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
} }
} }
private void refreshClusterMaxPriority() throws IOException, YarnException {
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getScheduler().setClusterMaxPriority(conf);
}
public String getHAZookeeperConnectionState() { public String getHAZookeeperConnectionState() {
if (!rmContext.isHAEnabled()) { if (!rmContext.isHAEnabled()) {
return "ResourceManager HA is not enabled."; return "ResourceManager HA is not enabled.";

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
@ -74,9 +75,13 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -84,6 +89,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -4648,4 +4654,67 @@ public void testContainerAllocationWithContainerIdLeap() throws Exception {
long reservedId = reservedContainer1.getContainerId().getContainerId(); long reservedId = reservedContainer1.getContainerId().getContainerId();
assertEquals(reservedId + 1, maxId); assertEquals(reservedId + 1, maxId);
} }
@Test(timeout = 120000)
public void testRefreshQueuesWhenRMHA() throws Exception {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
HAServiceProtocol.StateChangeRequestInfo requestInfo =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
// 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues
MockRM rm1 = new MockRM(conf, null);
rm1.init(conf);
rm1.start();
rm1.getAdminService().transitionToStandby(requestInfo);
// 2. add a new queue "test_queue"
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"test_queue\">");
out.println(" <maxRunningApps>3</maxRunningApps>");
out.println("</queue>");
out.println("</allocations>");
out.close();
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// 3. start a active RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.init(conf);
rm2.start();
MockNM nm =
new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
nm.registerNode();
rm2.getAdminService().transitionToActive(requestInfo);
// 4. submit a app to the new added queue "test_queue"
RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue");
RMAppAttempt attempt0 = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId());
am0.registerAppAttempt();
assertEquals("root.test_queue", app.getQueue());
// 5. Transit rm1 to active, recover app
((RMContextImpl)rm1.getRMContext()).setStateStore(memStore);
rm1.getAdminService().transitionToActive(requestInfo);
rm1.drainEvents();
assertEquals(1, rm1.getRMContext().getRMApps().size());
RMApp recoveredApp =
rm1.getRMContext().getRMApps().values().iterator().next();
assertEquals("root.test_queue", recoveredApp.getQueue());
rm1.stop();
rm2.stop();
}
} }