YARN-5333. Some recovered apps are put into default queue when RM HA. Contributed by Jun Gong.

This commit is contained in:
Rohith Sharma K S 2016-08-05 21:35:49 +05:30
parent 4a26221021
commit d9a354c2f3
2 changed files with 142 additions and 45 deletions

View File

@ -47,7 +47,6 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.HAUtil;
@ -319,15 +318,7 @@ public class AdminService extends CompositeService implements
UserGroupInformation user = checkAccess("transitionToActive");
checkHaStateChange(reqInfo);
try {
rm.transitionToActive();
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
"", "RM",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
try {
// call all refresh*s for active RM to get the updated configurations.
refreshAll();
@ -337,10 +328,22 @@ public class AdminService extends CompositeService implements
.getDispatcher()
.getEventHandler()
.handle(
new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e));
new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED,
e));
throw new ServiceFailedException(
"Error on refreshAll during transistion to Active", e);
"Error on refreshAll during transition to Active", e);
}
try {
rm.transitionToActive();
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
"", "RM",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
"RM");
}
@ -408,12 +411,7 @@ public class AdminService extends CompositeService implements
RefreshQueuesResponse response =
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
// refresh the reservation system
ReservationSystem rSystem = rmContext.getReservationSystem();
if (rSystem != null) {
rSystem.reinitialize(getConfig(), rmContext);
}
refreshQueues();
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
return response;
@ -422,6 +420,15 @@ public class AdminService extends CompositeService implements
}
}
private void refreshQueues() throws IOException, YarnException {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
// refresh the reservation system
ReservationSystem rSystem = rmContext.getReservationSystem();
if (rSystem != null) {
rSystem.reinitialize(getConfig(), rmContext);
}
}
@Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException, StandbyException {
@ -454,6 +461,13 @@ public class AdminService extends CompositeService implements
}
}
private void refreshNodes() throws IOException, YarnException {
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getNodesListManager().refreshNodes(conf);
}
@Override
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
@ -464,6 +478,16 @@ public class AdminService extends CompositeService implements
checkRMStatus(user.getShortUserName(), operation,
"refresh super-user-groups.");
refreshSuperUserGroupsConfiguration();
RMAuditLogger.logSuccess(user.getShortUserName(),
operation, "AdminService");
return recordFactory.newRecordInstance(
RefreshSuperUserGroupsConfigurationResponse.class);
}
private void refreshSuperUserGroupsConfiguration()
throws IOException, YarnException {
// Accept hadoop common configs in core-site.xml as well as RM specific
// configurations in yarn-site.xml
Configuration conf =
@ -472,11 +496,6 @@ public class AdminService extends CompositeService implements
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
RMServerUtils.processRMProxyUsersConf(conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
RMAuditLogger.logSuccess(user.getShortUserName(),
operation, "AdminService");
return recordFactory.newRecordInstance(
RefreshSuperUserGroupsConfigurationResponse.class);
}
@Override
@ -488,10 +507,7 @@ public class AdminService extends CompositeService implements
checkRMStatus(user.getShortUserName(), operation, "refresh user-groups.");
Groups.getUserToGroupsMappingService(
getConfiguration(new Configuration(false),
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
refreshUserToGroupsMappings();
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@ -499,6 +515,12 @@ public class AdminService extends CompositeService implements
RefreshUserToGroupsMappingsResponse.class);
}
private void refreshUserToGroupsMappings() throws IOException, YarnException {
Groups.getUserToGroupsMappingService(
getConfiguration(new Configuration(false),
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
}
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException {
@ -541,6 +563,14 @@ public class AdminService extends CompositeService implements
checkRMStatus(user.getShortUserName(), operation, "refresh Service ACLs.");
refreshServiceAcls();
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
private void refreshServiceAcls() throws IOException, YarnException {
PolicyProvider policyProvider = RMPolicyProvider.getInstance();
Configuration conf =
getConfiguration(new Configuration(false),
@ -552,11 +582,6 @@ public class AdminService extends CompositeService implements
conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
private synchronized void refreshServiceAcls(Configuration configuration,
@ -691,18 +716,17 @@ public class AdminService extends CompositeService implements
@VisibleForTesting
void refreshAll() throws ServiceFailedException {
try {
refreshQueues(RefreshQueuesRequest.newInstance());
refreshNodes(RefreshNodesRequest.newInstance(DecommissionType.NORMAL));
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest.newInstance());
refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest.newInstance());
checkAcls("refreshAll");
refreshQueues();
refreshNodes();
refreshSuperUserGroupsConfiguration();
refreshUserToGroupsMappings();
if (getConfig().getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(RefreshServiceAclsRequest.newInstance());
refreshServiceAcls();
}
refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest.newInstance());
refreshClusterMaxPriority();
} catch (Exception ex) {
throw new ServiceFailedException(ex.getMessage());
}
@ -839,11 +863,7 @@ public class AdminService extends CompositeService implements
checkRMStatus(user.getShortUserName(), operation, msg);
try {
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getScheduler().setClusterMaxPriority(conf);
refreshClusterMaxPriority();
RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService");
@ -854,6 +874,14 @@ public class AdminService extends CompositeService implements
}
}
private void refreshClusterMaxPriority() throws IOException, YarnException {
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getScheduler().setClusterMaxPriority(conf);
}
public String getHAZookeeperConnectionState() {
if (!rmContext.isHAEnabled()) {
return "ResourceManager HA is not enabled.";

View File

@ -49,6 +49,7 @@ import javax.xml.parsers.ParserConfigurationException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetworkTopology;
@ -74,9 +75,13 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -84,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -4648,4 +4654,67 @@ public class TestFairScheduler extends FairSchedulerTestBase {
long reservedId = reservedContainer1.getContainerId().getContainerId();
assertEquals(reservedId + 1, maxId);
}
@Test(timeout = 120000)
public void testRefreshQueuesWhenRMHA() throws Exception {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
HAServiceProtocol.StateChangeRequestInfo requestInfo =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
// 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues
MockRM rm1 = new MockRM(conf, null);
rm1.init(conf);
rm1.start();
rm1.getAdminService().transitionToStandby(requestInfo);
// 2. add a new queue "test_queue"
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"test_queue\">");
out.println(" <maxRunningApps>3</maxRunningApps>");
out.println("</queue>");
out.println("</allocations>");
out.close();
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// 3. start a active RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.init(conf);
rm2.start();
MockNM nm =
new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
nm.registerNode();
rm2.getAdminService().transitionToActive(requestInfo);
// 4. submit a app to the new added queue "test_queue"
RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue");
RMAppAttempt attempt0 = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId());
am0.registerAppAttempt();
assertEquals("root.test_queue", app.getQueue());
// 5. Transit rm1 to active, recover app
((RMContextImpl)rm1.getRMContext()).setStateStore(memStore);
rm1.getAdminService().transitionToActive(requestInfo);
rm1.drainEvents();
assertEquals(1, rm1.getRMContext().getRMApps().size());
RMApp recoveredApp =
rm1.getRMContext().getRMApps().values().iterator().next();
assertEquals("root.test_queue", recoveredApp.getQueue());
rm1.stop();
rm2.stop();
}
}