YARN-1734. Fixed ResourceManager to update the configurations when it transits from standby to active mode so as to assimilate any changes that happened while it was in standby mode. Contributed by Xuan Gong.

svn merge --ignore-ancestry -c 1571539 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1571540 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-25 02:08:36 +00:00
parent a6529b363c
commit ffd554b271
3 changed files with 138 additions and 2 deletions

View File

@ -333,6 +333,10 @@ Release 2.4.0 - UNRELEASED
re-registration after a RESYNC and thus avoid hanging. (Rohith Sharma via
vinodkv)
YARN-1734. Fixed ResourceManager to update the configurations when it
transits from standby to active mode so as to assimilate any changes that
happened while it was in standby mode. (Xuan Gong via vinodkv)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -250,10 +250,20 @@ public class AdminService extends CompositeService implements
@Override
public synchronized void transitionToActive(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
// call refreshAdminAcls before HA state transition
// for the case that adminAcls have been updated in previous active RM
try {
refreshAdminAcls(false);
} catch (YarnException ex) {
throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
}
UserGroupInformation user = checkAccess("transitionToActive");
checkHaStateChange(reqInfo);
try {
rm.transitionToActive();
// call all refresh*s for active RM to get the updated configurations.
refreshAll();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
@ -268,6 +278,13 @@ public class AdminService extends CompositeService implements
@Override
public synchronized void transitionToStandby(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
// call refreshAdminAcls before HA state transition
// for the case that adminAcls have been updated in previous active RM
try {
refreshAdminAcls(false);
} catch (YarnException ex) {
throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
}
UserGroupInformation user = checkAccess("transitionToStandby");
checkHaStateChange(reqInfo);
try {
@ -406,10 +423,15 @@ public class AdminService extends CompositeService implements
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException {
return refreshAdminAcls(true);
}
private RefreshAdminAclsResponse refreshAdminAcls(boolean checkRMHAState)
throws YarnException, IOException {
String argName = "refreshAdminAcls";
UserGroupInformation user = checkAcls(argName);
if (!isRMActive()) {
if (checkRMHAState && !isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh user-groups.");
@ -521,6 +543,24 @@ public class AdminService extends CompositeService implements
return conf;
}
private void refreshAll() throws ServiceFailedException {
try {
refreshQueues(RefreshQueuesRequest.newInstance());
refreshNodes(RefreshNodesRequest.newInstance());
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest.newInstance());
refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest.newInstance());
if (getConfig().getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(RefreshServiceAclsRequest.newInstance());
}
} catch (Exception ex) {
throw new ServiceFailedException(ex.getMessage());
}
}
@VisibleForTesting
public AccessControlList getAccessControlList() {
return this.adminAcl;

View File

@ -34,12 +34,16 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
@ -518,6 +522,94 @@ public class TestRMAdminService {
Assert.assertTrue(excludeHosts.contains("0.0.0.0:123"));
}
@Test
public void testRMHAWithFileSystemBasedConfiguration() throws IOException,
YarnException {
StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
int base = 100;
for (String confKey : YarnConfiguration
.getServiceAddressConfKeys(configuration)) {
configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
+ (base + 20));
configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
+ (base + 40));
base = base * 2;
}
Configuration conf1 = new Configuration(configuration);
conf1.set(YarnConfiguration.RM_HA_ID, "rm1");
Configuration conf2 = new Configuration(configuration);
conf2.set(YarnConfiguration.RM_HA_ID, "rm2");
// upload default configurations
uploadDefaultConfiguration();
MockRM rm1 = null;
MockRM rm2 = null;
try {
rm1 = new MockRM(conf1);
rm1.init(conf1);
rm1.start();
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
rm2 = new MockRM(conf2);
rm2.init(conf1);
rm2.start();
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
rm1.adminService.transitionToActive(requestInfo);
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.ACTIVE);
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.set("yarn.scheduler.capacity.maximum-applications", "5000");
uploadConfiguration(csConf, "capacity-scheduler.xml");
rm1.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
int maxApps =
((CapacityScheduler) rm1.getRMContext().getScheduler())
.getConfiguration().getMaximumSystemApplications();
Assert.assertEquals(maxApps, 5000);
// Before failover happens, the maxApps is
// still the default value on the standby rm : rm2
int maxAppsBeforeFailOver =
((CapacityScheduler) rm2.getRMContext().getScheduler())
.getConfiguration().getMaximumSystemApplications();
Assert.assertEquals(maxAppsBeforeFailOver, 10000);
// Do the failover
rm1.adminService.transitionToStandby(requestInfo);
rm2.adminService.transitionToActive(requestInfo);
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
== HAServiceState.ACTIVE);
int maxAppsAfter =
((CapacityScheduler) rm2.getRMContext().getScheduler())
.getConfiguration().getMaximumSystemApplications();
Assert.assertEquals(maxAppsAfter, 5000);
} finally {
if (rm1 != null) {
rm1.stop();
}
if (rm2 != null) {
rm2.stop();
}
}
}
private String writeConfigurationXML(Configuration conf, String confXMLName)
throws IOException {
DataOutputStream output = null;