YARN-1669. Modified RM HA handling of protocol level service-ACLS to be available across RM failover by making using of a remote configuration-provider. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1564549 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-04 22:46:52 +00:00
parent cd1e8d2e34
commit b812af964d
10 changed files with 254 additions and 35 deletions

View File

@ -450,6 +450,14 @@ public void refreshServiceAcl(Configuration conf, PolicyProvider provider) {
serviceAuthorizationManager.refresh(conf, provider);
}
/**
* Refresh the service authorization ACL for the service handled by this server
* using the specified Configuration.
*/
public void refreshServiceAclWithConfigration(Configuration conf,
PolicyProvider provider) {
serviceAuthorizationManager.refreshWithConfiguration(conf, provider);
}
/**
* Returns a handle to the serviceAuthorizationManager (required in tests)
* @return instance of ServiceAuthorizationManager for this server

View File

@ -33,6 +33,8 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
/**
* An authorization manager which handles service-level authorization
* for incoming service requests.
@ -120,7 +122,11 @@ public synchronized void refresh(Configuration conf,
// Make a copy of the original config, and load the policy file
Configuration policyConf = new Configuration(conf);
policyConf.addResource(policyFile);
refreshWithConfiguration(policyConf, provider);
}
public synchronized void refreshWithConfiguration(Configuration conf,
PolicyProvider provider) {
final Map<Class<?>, AccessControlList> newAcls =
new IdentityHashMap<Class<?>, AccessControlList>();
@ -130,7 +136,7 @@ public synchronized void refresh(Configuration conf,
for (Service service : services) {
AccessControlList acl =
new AccessControlList(
policyConf.get(service.getServiceKey(),
conf.get(service.getServiceKey(),
AccessControlList.WILDCARD_ACL_VALUE)
);
newAcls.put(service.getProtocol(), acl);
@ -141,8 +147,13 @@ public synchronized void refresh(Configuration conf,
protocolToAcl = newAcls;
}
// Package-protected for use in tests.
Set<Class<?>> getProtocolsWithAcls() {
@VisibleForTesting
public Set<Class<?>> getProtocolsWithAcls() {
return protocolToAcl.keySet();
}
@VisibleForTesting
public AccessControlList getProtocolsAcls(Class<?> className) {
return protocolToAcl.get(className);
}
}

View File

@ -122,6 +122,10 @@ Release 2.4.0 - UNRELEASED
to be consistent with what exists (false) in the code and documentation.
(Kenji Kikushima via vinodkv)
YARN-1669. Modified RM HA handling of protocol level service-ACLS to
be available across RM failover by making using of a remote
configuration-provider. (Xuan Gong via vinodkv)
BUG FIXES
YARN-935. Correcting pom.xml to build applicationhistoryserver module

View File

@ -40,6 +40,10 @@ public class YarnConfiguration extends Configuration {
@Private
public static final String CS_CONFIGURATION_FILE= "capacity-scheduler.xml";
@Private
public static final String HADOOP_POLICY_CONFIGURATION_FILE =
"hadoop-policy.xml";
@Private
public static final String YARN_SITE_XML_FILE = "yarn-site.xml";

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
@ -432,9 +433,8 @@ public RefreshAdminAclsResponse refreshAdminAcls(
@Override
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request) throws YarnException {
Configuration conf = new Configuration();
if (!conf.getBoolean(
RefreshServiceAclsRequest request) throws YarnException, IOException {
if (!getConfig().getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
throw RPCUtil.getRemoteException(
@ -443,26 +443,37 @@ public RefreshServiceAclsResponse refreshServiceAcls(
") not enabled."));
}
String argName = "refreshServiceAcls";
if (!isRMActive()) {
RMAuditLogger.logFailure(UserGroupInformation.getCurrentUser()
.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh Service ACLs.");
throwStandbyException();
}
PolicyProvider policyProvider = new RMPolicyProvider();
Configuration conf =
getConfiguration(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
refreshServiceAcls(conf, policyProvider);
if (isRMActive()) {
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
rmContext.getApplicationMasterService().refreshServiceAcls(
conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
} else {
LOG.warn("ResourceManager is not active. Not refreshing ACLs for " +
"Clients, ApplicationMasters and NodeManagers");
}
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
void refreshServiceAcls(Configuration configuration,
synchronized void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.configurationProvider instanceof LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
policyProvider);
}
}
@Override
@ -519,4 +530,9 @@ private synchronized Configuration getConfiguration(String confFileName)
public AccessControlList getAccessControlList() {
return this.adminAcl;
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -86,6 +87,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
@SuppressWarnings("unchecked")
@Private
public class ApplicationMasterService extends AbstractService implements
@ -102,6 +105,7 @@ public class ApplicationMasterService extends AbstractService implements
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
private boolean useLocalConfigurationProvider;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
@ -111,6 +115,15 @@ public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
@ -578,7 +591,12 @@ public void unregisterAttempt(ApplicationAttemptId attemptId) {
public void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.useLocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
policyProvider);
}
}
@Override
@ -604,4 +622,9 @@ public synchronized void setAllocateResponse(AllocateResponse response) {
this.response = response;
}
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@ -106,6 +107,7 @@
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
@ -133,6 +135,7 @@ public class ClientRMService extends AbstractService implements
private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager;
private boolean useLocalConfigurationProvider;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
@ -150,6 +153,10 @@ public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
@Override
protected void serviceInit(Configuration conf) throws Exception {
clientBindAddress = getBindAddress(conf);
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@ -773,7 +780,12 @@ private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.useLocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
policyProvider);
}
}
private boolean isAllowedDelegationTokenOp() throws IOException {
@ -787,4 +799,9 @@ private boolean isAllowedDelegationTokenOp() throws IOException {
return true;
}
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -66,6 +67,8 @@
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
public class ResourceTrackerService extends AbstractService implements
ResourceTracker {
@ -92,6 +95,7 @@ public class ResourceTrackerService extends AbstractService implements
private int minAllocMb;
private int minAllocVcores;
private boolean useLocalConfigurationProvider;
static {
resync.setNodeAction(NodeAction.RESYNC);
@ -141,6 +145,10 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@ -415,6 +423,16 @@ public static Node resolve(String hostName) {
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.useLocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
policyProvider);
}
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -262,10 +263,10 @@ public Resource getClusterResources() {
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
if (!initialized) {
this.useLocalConfigurationProvider = conf.get(
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS).equals(
"org.apache.hadoop.yarn.LocalConfigurationProvider");
LocalConfigurationProvider.class)));
this.conf =
new CapacitySchedulerConfiguration(conf,
this.useLocalConfigurationProvider);

View File

@ -24,16 +24,19 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@ -190,6 +193,120 @@ public void testAdminAclsWithFileSystemBasedConfigurationProvider()
Assert.assertEquals(aclStringAfter, "world:anyone:rwcda");
}
@Test
public void testServiceAclsRefreshWithLocalConfigurationProvider() {
configuration.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
ResourceManager resourceManager = null;
try {
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
resourceManager.adminService.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
} catch (Exception ex) {
fail("Using localConfigurationProvider. Should not get any exception.");
} finally {
if (resourceManager != null) {
resourceManager.stop();
}
}
}
@SuppressWarnings("resource")
@Test
public void testServiceAclsRefreshWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
configuration.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
ResourceManager resourceManager = null;
try {
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
// clean the remoteDirectory
cleanRemoteDirectory();
try {
resourceManager.adminService
.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: hadoop-policy.xml"));
}
String aclsString = "alice,bob users,wheel";
Configuration conf = new Configuration();
conf.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
conf.set("security.applicationclient.protocol.acl", aclsString);
String hadoopConfFile = writeConfigurationXML(conf, "hadoop-policy.xml");
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(hadoopConfFile));
resourceManager.adminService.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
// verify service Acls refresh for AdminService
ServiceAuthorizationManager adminServiceServiceManager =
resourceManager.adminService.getServer()
.getServiceAuthorizationManager();
verifyServiceACLsRefresh(adminServiceServiceManager,
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class,
aclsString);
// verify service ACLs refresh for ClientRMService
ServiceAuthorizationManager clientRMServiceServiceManager =
resourceManager.getRMContext().getClientRMService().getServer()
.getServiceAuthorizationManager();
verifyServiceACLsRefresh(clientRMServiceServiceManager,
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class,
aclsString);
// verify service ACLs refresh for ApplicationMasterService
ServiceAuthorizationManager appMasterService =
resourceManager.getRMContext().getApplicationMasterService()
.getServer().getServiceAuthorizationManager();
verifyServiceACLsRefresh(appMasterService,
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class,
aclsString);
// verify service ACLs refresh for ResourceTrackerService
ServiceAuthorizationManager RTService =
resourceManager.getRMContext().getResourceTrackerService()
.getServer().getServiceAuthorizationManager();
verifyServiceACLsRefresh(RTService,
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class,
aclsString);
} finally {
if (resourceManager != null) {
resourceManager.stop();
}
}
}
private void verifyServiceACLsRefresh(ServiceAuthorizationManager manager,
Class<?> protocol, String aclString) {
for (Class<?> protocolClass : manager.getProtocolsWithAcls()) {
AccessControlList accessList =
manager.getProtocolsAcls(protocolClass);
if (protocolClass == protocol) {
Assert.assertEquals(accessList.getAclString(),
aclString);
} else {
Assert.assertEquals(accessList.getAclString(), "*");
}
}
}
@Test
public void
testRefreshSuperUserGroupsWithLocalConfigurationProvider() {