YARN-10110. Adding RouterPolicyProvider for RM Federation.
Contributed by Bilwa S T.
This commit is contained in:
parent
20903f72b4
commit
c83644deac
@ -404,6 +404,10 @@
|
|||||||
<Class name="org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider" />
|
<Class name="org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider" />
|
||||||
<Bug pattern="EI_EXPOSE_REP" />
|
<Bug pattern="EI_EXPOSE_REP" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider" />
|
||||||
|
<Bug pattern="EI_EXPOSE_REP" />
|
||||||
|
</Match>
|
||||||
|
|
||||||
|
|
||||||
<!-- Ignore EI_EXPOSE_REP2 in Log services -->
|
<!-- Ignore EI_EXPOSE_REP2 in Log services -->
|
||||||
|
@ -67,8 +67,10 @@ public class Router extends CompositeService {
|
|||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
||||||
private JvmPauseMonitor pauseMonitor;
|
private JvmPauseMonitor pauseMonitor;
|
||||||
private RouterClientRMService clientRMProxyService;
|
@VisibleForTesting
|
||||||
private RouterRMAdminService rmAdminProxyService;
|
protected RouterClientRMService clientRMProxyService;
|
||||||
|
@VisibleForTesting
|
||||||
|
protected RouterRMAdminService rmAdminProxyService;
|
||||||
private WebApp webApp;
|
private WebApp webApp;
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected String webAppAddress;
|
protected String webAppAddress;
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.yarn.server.router.clientrm;
|
package org.apache.hadoop.yarn.server.router.clientrm;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -28,8 +29,10 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
@ -108,6 +111,7 @@
|
|||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
|
||||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -171,6 +175,12 @@ protected void serviceStart() throws Exception {
|
|||||||
this.server = rpc.getServer(ApplicationClientProtocol.class, this,
|
this.server = rpc.getServer(ApplicationClientProtocol.class, this,
|
||||||
listenerEndpoint, serverConf, null, numWorkerThreads);
|
listenerEndpoint, serverConf, null, numWorkerThreads);
|
||||||
|
|
||||||
|
// Enable service authorization?
|
||||||
|
if (conf.getBoolean(
|
||||||
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
||||||
|
refreshServiceAcls(conf, RouterPolicyProvider.getInstance());
|
||||||
|
}
|
||||||
|
|
||||||
this.server.start();
|
this.server.start();
|
||||||
LOG.info("Router ClientRMService listening on address: "
|
LOG.info("Router ClientRMService listening on address: "
|
||||||
+ this.server.getListenerAddress());
|
+ this.server.getListenerAddress());
|
||||||
@ -187,6 +197,11 @@ protected void serviceStop() throws Exception {
|
|||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Server getServer() {
|
||||||
|
return this.server;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the comma separated intercepter class names from the configuration.
|
* Returns the comma separated intercepter class names from the configuration.
|
||||||
*
|
*
|
||||||
@ -469,6 +484,11 @@ protected RequestInterceptorChainWrapper getInterceptorChain()
|
|||||||
return initializePipeline(user);
|
return initializePipeline(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void refreshServiceAcls(Configuration configuration,
|
||||||
|
PolicyProvider policyProvider) {
|
||||||
|
this.server.refreshServiceAcl(configuration, policyProvider);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the Request intercepter chains for all the users.
|
* Gets the Request intercepter chains for all the users.
|
||||||
*
|
*
|
||||||
|
@ -28,9 +28,11 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
@ -67,6 +69,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
|
||||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -130,6 +133,11 @@ protected void serviceStart() throws Exception {
|
|||||||
this.server = rpc.getServer(ResourceManagerAdministrationProtocol.class,
|
this.server = rpc.getServer(ResourceManagerAdministrationProtocol.class,
|
||||||
this, listenerEndpoint, serverConf, null, numWorkerThreads);
|
this, listenerEndpoint, serverConf, null, numWorkerThreads);
|
||||||
|
|
||||||
|
if (conf.getBoolean(
|
||||||
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
||||||
|
refreshServiceAcls(conf, RouterPolicyProvider.getInstance());
|
||||||
|
}
|
||||||
|
|
||||||
this.server.start();
|
this.server.start();
|
||||||
LOG.info("Router RMAdminService listening on address: "
|
LOG.info("Router RMAdminService listening on address: "
|
||||||
+ this.server.getListenerAddress());
|
+ this.server.getListenerAddress());
|
||||||
@ -146,6 +154,16 @@ protected void serviceStop() throws Exception {
|
|||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void refreshServiceAcls(Configuration configuration,
|
||||||
|
PolicyProvider policyProvider) {
|
||||||
|
this.server.refreshServiceAcl(configuration, policyProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Server getServer() {
|
||||||
|
return this.server;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the comma separated intercepter class names from the configuration.
|
* Returns the comma separated intercepter class names from the configuration.
|
||||||
*
|
*
|
||||||
|
@ -0,0 +1,66 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.router.security.authorize;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
|
import org.apache.hadoop.security.authorize.Service;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocolPB;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link PolicyProvider} for YARN Router server protocols.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class RouterPolicyProvider extends PolicyProvider {
|
||||||
|
|
||||||
|
private static volatile RouterPolicyProvider routerPolicyProvider = null;
|
||||||
|
|
||||||
|
private RouterPolicyProvider() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public static RouterPolicyProvider getInstance() {
|
||||||
|
if (routerPolicyProvider == null) {
|
||||||
|
synchronized (RouterPolicyProvider.class) {
|
||||||
|
if (routerPolicyProvider == null) {
|
||||||
|
routerPolicyProvider = new RouterPolicyProvider();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return routerPolicyProvider;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Service[] ROUTER_SERVICES = new Service[] {
|
||||||
|
new Service(
|
||||||
|
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONCLIENT_PROTOCOL,
|
||||||
|
ApplicationClientProtocolPB.class),
|
||||||
|
new Service(
|
||||||
|
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCEMANAGER_ADMINISTRATION_PROTOCOL,
|
||||||
|
ResourceManagerAdministrationProtocolPB.class), };
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Service[] getServices() {
|
||||||
|
return ROUTER_SERVICES;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,22 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** Router Security Authorization package. **/
|
||||||
|
package org.apache.hadoop.yarn.server.router.security.authorize;
|
||||||
|
|
||||||
|
|
@ -18,8 +18,14 @@
|
|||||||
package org.apache.hadoop.yarn.server.router;
|
package org.apache.hadoop.yarn.server.router;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -35,4 +41,50 @@ public void testJVMMetricsService() {
|
|||||||
assertEquals(3, router.getServices().size());
|
assertEquals(3, router.getServices().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServiceACLRefresh() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
|
||||||
|
true);
|
||||||
|
String aclsString = "alice,bob users,wheel";
|
||||||
|
conf.set("security.applicationclient.protocol.acl", aclsString);
|
||||||
|
conf.set("security.resourcemanager-administration.protocol.acl",
|
||||||
|
aclsString);
|
||||||
|
|
||||||
|
Router router = new Router();
|
||||||
|
router.init(conf);
|
||||||
|
router.start();
|
||||||
|
|
||||||
|
// verify service Acls refresh for RouterClientRMService
|
||||||
|
ServiceAuthorizationManager clientRMServiceManager =
|
||||||
|
router.clientRMProxyService.getServer().
|
||||||
|
getServiceAuthorizationManager();
|
||||||
|
verifyServiceACLsRefresh(clientRMServiceManager,
|
||||||
|
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class,
|
||||||
|
aclsString);
|
||||||
|
|
||||||
|
// verify service Acls refresh for RouterRMAdminService
|
||||||
|
ServiceAuthorizationManager routerAdminServiceManager =
|
||||||
|
router.rmAdminProxyService.getServer().getServiceAuthorizationManager();
|
||||||
|
verifyServiceACLsRefresh(routerAdminServiceManager,
|
||||||
|
org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB.class,
|
||||||
|
aclsString);
|
||||||
|
|
||||||
|
router.stop();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyServiceACLsRefresh(ServiceAuthorizationManager manager,
|
||||||
|
Class<?> protocol, String aclString) {
|
||||||
|
if (manager.getProtocolsWithAcls().size() == 0) {
|
||||||
|
fail("Acls are not refreshed for protocol " + protocol);
|
||||||
|
}
|
||||||
|
for (Class<?> protocolClass : manager.getProtocolsWithAcls()) {
|
||||||
|
AccessControlList accessList = manager.getProtocolsAcls(protocolClass);
|
||||||
|
if (protocolClass == protocol) {
|
||||||
|
Assert.assertEquals(accessList.getAclString(), aclString);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user