YARN-1669. Modified RM HA handling of protocol level service-ACLS to be available across RM failover by making using of a remote configuration-provider. Contributed by Xuan Gong.

svn merge --ignore-ancestry -c 1564549 ../../trunk/
Ran into minor import related conflicts that I merged manually.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1564552 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-04 22:57:37 +00:00
parent fb247bbc32
commit afa5cebbc1
10 changed files with 256 additions and 35 deletions

View File

@ -447,6 +447,14 @@ public abstract class Server {
serviceAuthorizationManager.refresh(conf, provider);
}
/**
* Refresh the service authorization ACL for the service handled by this server
* using the specified Configuration.
*/
public void refreshServiceAclWithConfigration(Configuration conf,
PolicyProvider provider) {
serviceAuthorizationManager.refreshWithConfiguration(conf, provider);
}
/**
* Returns a handle to the serviceAuthorizationManager (required in tests)
* @return instance of ServiceAuthorizationManager for this server

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
/**
* An authorization manager which handles service-level authorization
* for incoming service requests.
@ -120,19 +122,23 @@ public class ServiceAuthorizationManager {
// Make a copy of the original config, and load the policy file
Configuration policyConf = new Configuration(conf);
policyConf.addResource(policyFile);
refreshWithConfiguration(policyConf, provider);
}
public synchronized void refreshWithConfiguration(Configuration conf,
PolicyProvider provider) {
final Map<Class<?>, AccessControlList> newAcls =
new IdentityHashMap<Class<?>, AccessControlList>();
new IdentityHashMap<Class<?>, AccessControlList>();
// Parse the config file
Service[] services = provider.getServices();
if (services != null) {
for (Service service : services) {
AccessControlList acl =
new AccessControlList(
policyConf.get(service.getServiceKey(),
AccessControlList.WILDCARD_ACL_VALUE)
);
AccessControlList acl =
new AccessControlList(
conf.get(service.getServiceKey(),
AccessControlList.WILDCARD_ACL_VALUE)
);
newAcls.put(service.getProtocol(), acl);
}
}
@ -141,8 +147,13 @@ public class ServiceAuthorizationManager {
protocolToAcl = newAcls;
}
// Package-protected for use in tests.
Set<Class<?>> getProtocolsWithAcls() {
@VisibleForTesting
public Set<Class<?>> getProtocolsWithAcls() {
return protocolToAcl.keySet();
}
@VisibleForTesting
public AccessControlList getProtocolsAcls(Class<?> className) {
return protocolToAcl.get(className);
}
}

View File

@ -99,6 +99,10 @@ Release 2.4.0 - UNRELEASED
to be consistent with what exists (false) in the code and documentation.
(Kenji Kikushima via vinodkv)
YARN-1669. Modified RM HA handling of protocol level service-ACLS to
be available across RM failover by making using of a remote
configuration-provider. (Xuan Gong via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -40,6 +40,10 @@ public class YarnConfiguration extends Configuration {
@Private
public static final String CS_CONFIGURATION_FILE= "capacity-scheduler.xml";
@Private
public static final String HADOOP_POLICY_CONFIGURATION_FILE =
"hadoop-policy.xml";
@Private
public static final String YARN_SITE_XML_FILE = "yarn-site.xml";

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
@ -432,9 +433,8 @@ public class AdminService extends CompositeService implements
@Override
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request) throws YarnException {
Configuration conf = new Configuration();
if (!conf.getBoolean(
RefreshServiceAclsRequest request) throws YarnException, IOException {
if (!getConfig().getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
throw RPCUtil.getRemoteException(
@ -442,27 +442,38 @@ public class AdminService extends CompositeService implements
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION +
") not enabled."));
}
PolicyProvider policyProvider = new RMPolicyProvider();
refreshServiceAcls(conf, policyProvider);
if (isRMActive()) {
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
rmContext.getApplicationMasterService().refreshServiceAcls(
conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
} else {
LOG.warn("ResourceManager is not active. Not refreshing ACLs for " +
"Clients, ApplicationMasters and NodeManagers");
String argName = "refreshServiceAcls";
if (!isRMActive()) {
RMAuditLogger.logFailure(UserGroupInformation.getCurrentUser()
.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh Service ACLs.");
throwStandbyException();
}
PolicyProvider policyProvider = new RMPolicyProvider();
Configuration conf =
getConfiguration(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
refreshServiceAcls(conf, policyProvider);
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
rmContext.getApplicationMasterService().refreshServiceAcls(
conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
void refreshServiceAcls(Configuration configuration,
synchronized void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
if (this.configurationProvider instanceof LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
policyProvider);
}
}
@Override
@ -519,4 +530,9 @@ public class AdminService extends CompositeService implements
public AccessControlList getAccessControlList() {
return this.adminAcl;
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -86,6 +87,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
@SuppressWarnings("unchecked")
@Private
public class ApplicationMasterService extends AbstractService implements
@ -102,6 +105,7 @@ public class ApplicationMasterService extends AbstractService implements
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
private boolean useLocalConfigurationProvider;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
@ -111,6 +115,15 @@ public class ApplicationMasterService extends AbstractService implements
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
@ -578,7 +591,12 @@ public class ApplicationMasterService extends AbstractService implements
public void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
if (this.useLocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
policyProvider);
}
}
@Override
@ -604,4 +622,9 @@ public class ApplicationMasterService extends AbstractService implements
this.response = response;
}
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@ -102,6 +103,9 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
/**
* The client interface to the Resource Manager. This module handles all the rpc
@ -126,6 +130,7 @@ public class ClientRMService extends AbstractService implements
private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager;
private boolean useLocalConfigurationProvider;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
@ -143,6 +148,10 @@ public class ClientRMService extends AbstractService implements
@Override
protected void serviceInit(Configuration conf) throws Exception {
clientBindAddress = getBindAddress(conf);
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@ -696,7 +705,12 @@ public class ClientRMService extends AbstractService implements
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
if (this.useLocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
policyProvider);
}
}
private boolean isAllowedDelegationTokenOp() throws IOException {
@ -710,4 +724,9 @@ public class ClientRMService extends AbstractService implements
return true;
}
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -66,6 +67,8 @@ import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
public class ResourceTrackerService extends AbstractService implements
ResourceTracker {
@ -92,6 +95,7 @@ public class ResourceTrackerService extends AbstractService implements
private int minAllocMb;
private int minAllocVcores;
private boolean useLocalConfigurationProvider;
static {
resync.setNodeAction(NodeAction.RESYNC);
@ -141,6 +145,10 @@ public class ResourceTrackerService extends AbstractService implements
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@ -415,6 +423,16 @@ public class ResourceTrackerService extends AbstractService implements
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
if (this.useLocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
policyProvider);
}
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -262,10 +263,10 @@ public class CapacityScheduler extends AbstractYarnScheduler
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
if (!initialized) {
this.useLocalConfigurationProvider = conf.get(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS).equals(
"org.apache.hadoop.yarn.LocalConfigurationProvider");
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
this.conf =
new CapacitySchedulerConfiguration(conf,
this.useLocalConfigurationProvider);

View File

@ -24,16 +24,19 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@ -190,6 +193,120 @@ public class TestRMAdminService {
Assert.assertEquals(aclStringAfter, "world:anyone:rwcda");
}
@Test
public void testServiceAclsRefreshWithLocalConfigurationProvider() {
configuration.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
ResourceManager resourceManager = null;
try {
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
resourceManager.adminService.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
} catch (Exception ex) {
fail("Using localConfigurationProvider. Should not get any exception.");
} finally {
if (resourceManager != null) {
resourceManager.stop();
}
}
}
@SuppressWarnings("resource")
@Test
public void testServiceAclsRefreshWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
configuration.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
ResourceManager resourceManager = null;
try {
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
// clean the remoteDirectory
cleanRemoteDirectory();
try {
resourceManager.adminService
.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: hadoop-policy.xml"));
}
String aclsString = "alice,bob users,wheel";
Configuration conf = new Configuration();
conf.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
conf.set("security.applicationclient.protocol.acl", aclsString);
String hadoopConfFile = writeConfigurationXML(conf, "hadoop-policy.xml");
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(hadoopConfFile));
resourceManager.adminService.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
// verify service Acls refresh for AdminService
ServiceAuthorizationManager adminServiceServiceManager =
resourceManager.adminService.getServer()
.getServiceAuthorizationManager();
verifyServiceACLsRefresh(adminServiceServiceManager,
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class,
aclsString);
// verify service ACLs refresh for ClientRMService
ServiceAuthorizationManager clientRMServiceServiceManager =
resourceManager.getRMContext().getClientRMService().getServer()
.getServiceAuthorizationManager();
verifyServiceACLsRefresh(clientRMServiceServiceManager,
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class,
aclsString);
// verify service ACLs refresh for ApplicationMasterService
ServiceAuthorizationManager appMasterService =
resourceManager.getRMContext().getApplicationMasterService()
.getServer().getServiceAuthorizationManager();
verifyServiceACLsRefresh(appMasterService,
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class,
aclsString);
// verify service ACLs refresh for ResourceTrackerService
ServiceAuthorizationManager RTService =
resourceManager.getRMContext().getResourceTrackerService()
.getServer().getServiceAuthorizationManager();
verifyServiceACLsRefresh(RTService,
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class,
aclsString);
} finally {
if (resourceManager != null) {
resourceManager.stop();
}
}
}
private void verifyServiceACLsRefresh(ServiceAuthorizationManager manager,
Class<?> protocol, String aclString) {
for (Class<?> protocolClass : manager.getProtocolsWithAcls()) {
AccessControlList accessList =
manager.getProtocolsAcls(protocolClass);
if (protocolClass == protocol) {
Assert.assertEquals(accessList.getAclString(),
aclString);
} else {
Assert.assertEquals(accessList.getAclString(), "*");
}
}
}
@Test
public void
testRefreshSuperUserGroupsWithLocalConfigurationProvider() {