YARN-6539. Create SecureLogin inside Router. (#4712)

This commit is contained in:
slfan1989 2022-08-12 04:25:51 +08:00 committed by GitHub
parent 09cabaad68
commit 6ca2d3f848
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 536 additions and 11 deletions

View File

@ -4107,6 +4107,16 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_ROUTER_WEBAPP_READ_TIMEOUT =
TimeUnit.SECONDS.toMillis(30);
/** The Kerberos keytab for the yarn router.*/
public static final String ROUTER_KEYTAB = ROUTER_PREFIX + "keytab.file";
/** The Kerberos principal for the yarn router.*/
public static final String ROUTER_PRINCIPAL = ROUTER_PREFIX + "kerberos.principal";
/** The Kerberos principal hostname for the yarn router.*/
public static final String ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY = ROUTER_PREFIX +
"kerberos.principal.hostname";
////////////////////////////////
// CSI Volume configs
////////////////////////////////

View File

@ -4888,4 +4888,37 @@
default implementation LocalityAppPlacementAllocator is used.
</description>
</property>
<property>
<name>yarn.router.keytab.file</name>
<value></value>
<description>
The keytab file used by router to login as its
service principal. The principal name is configured with
dfs.federation.router.kerberos.principal.
</description>
</property>
<property>
<name>yarn.router.kerberos.principal</name>
<value></value>
<description>
The Router service principal. This is typically set to
router/_HOST@REALM.TLD. Each Router will substitute _HOST with its
own fully qualified hostname at startup. The _HOST placeholder
allows using the same configuration setting on both Router setup.
</description>
</property>
<property>
<name>yarn.router.kerberos.principal.hostname</name>
<value></value>
<description>
Optional.
The hostname for the Router containing this
configuration file. Will be different for each machine.
Defaults to current hostname.
</description>
</property>
</configuration>

View File

@ -43,6 +43,14 @@ public abstract class SubClusterId implements Comparable<SubClusterId> {
return id;
}
@Private
@Unstable
public static SubClusterId newInstance(Integer subClusterId) {
SubClusterId id = Records.newRecord(SubClusterId.class);
id.setId(String.valueOf(subClusterId));
return id;
}
/**
* Get the string identifier of the <em>subcluster</em> which is unique across
* the federated cluster. The identifier is static, i.e. preserved across

View File

@ -609,4 +609,10 @@ public final class FederationStateStoreFacade {
protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}
@VisibleForTesting
public FederationStateStore getStateStore() {
return stateStore;
}
}

View File

@ -382,8 +382,13 @@ public class UnmanagedApplicationManager {
protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
ApplicationId appId) throws IOException, YarnException {
try {
UserGroupInformation appSubmitter =
UserGroupInformation.createRemoteUser(this.submitter);
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
appSubmitter = UserGroupInformation.createProxyUser(this.submitter,
UserGroupInformation.getLoginUser());
} else {
appSubmitter = UserGroupInformation.createRemoteUser(this.submitter);
}
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
appSubmitter, null);

View File

@ -459,8 +459,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// Get the running containers from home RM, note that we will also get the
// AM container itself from here. We don't need it, but no harm to put the
// map as well.
UserGroupInformation appSubmitter = UserGroupInformation
.createRemoteUser(getApplicationContext().getUser());
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
appSubmitter = UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
UserGroupInformation.getLoginUser());
} else {
appSubmitter = UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
}
ApplicationClientProtocol rmClient =
createHomeRMProxy(getApplicationContext(),
ApplicationClientProtocol.class, appSubmitter);

View File

@ -116,6 +116,19 @@
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>

View File

@ -19,12 +19,15 @@
package org.apache.hadoop.yarn.server.router;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ShutdownHookManager;
@ -88,7 +91,8 @@ public class Router extends CompositeService {
}
protected void doSecureLogin() throws IOException {
// TODO YARN-6539 Create SecureLogin inside Router
SecurityUtil.login(this.conf, YarnConfiguration.ROUTER_KEYTAB,
YarnConfiguration.ROUTER_PRINCIPAL, getHostName(this.conf));
}
@Override
@ -195,4 +199,31 @@ public class Router extends CompositeService {
System.exit(-1);
}
}
@VisibleForTesting
public RouterClientRMService getClientRMProxyService() {
return clientRMProxyService;
}
@VisibleForTesting
public RouterRMAdminService getRmAdminProxyService() {
return rmAdminProxyService;
}
/**
* Returns the hostname for this Router. If the hostname is not
* explicitly configured in the given config, then it is determined.
*
* @param config configuration
* @return the hostname (NB: may not be a FQDN)
* @throws UnknownHostException if the hostname cannot be determined
*/
private String getHostName(Configuration config)
throws UnknownHostException {
String name = config.get(YarnConfiguration.ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY);
if (name == null) {
name = InetAddress.getLocalHost().getHostName();
}
return name;
}
}

View File

@ -106,8 +106,9 @@ public abstract class AbstractClientRequestInterceptor
try {
// Do not create a proxy user if user name matches the user name on
// current UGI
if (userName.equalsIgnoreCase(
UserGroupInformation.getCurrentUser().getUserName())) {
if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,

View File

@ -1623,4 +1623,14 @@ public class FederationClientInterceptor
String.format("Can't Found applicationId = %s in any sub clusters", applicationId);
throw new YarnException(errorMsg);
}
@VisibleForTesting
public FederationStateStoreFacade getFederationFacade() {
return federationFacade;
}
@VisibleForTesting
public Map<SubClusterId, ApplicationClientProtocol> getClientRMProxies() {
return clientRMProxies;
}
}

View File

@ -444,7 +444,7 @@ public class RouterClientRMService extends AbstractService
}
@VisibleForTesting
protected RequestInterceptorChainWrapper getInterceptorChain()
public RequestInterceptorChainWrapper getInterceptorChain()
throws IOException {
String user = UserGroupInformation.getCurrentUser().getUserName();
RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
@ -553,4 +553,9 @@ public class RouterClientRMService extends AbstractService
rootInterceptor.shutdown();
}
}
@VisibleForTesting
public Map<String, RequestInterceptorChainWrapper> getUserPipelineMap() {
return userPipelineMap;
}
}

View File

@ -80,8 +80,9 @@ public class DefaultRMAdminRequestInterceptor
try {
// Do not create a proxy user if user name matches the user name on
// current UGI
if (userName.equalsIgnoreCase(
UserGroupInformation.getCurrentUser().getUserName())) {
if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,

View File

@ -157,7 +157,7 @@ public class RouterRMAdminService extends AbstractService
}
@VisibleForTesting
protected RequestInterceptorChainWrapper getInterceptorChain()
public RequestInterceptorChainWrapper getInterceptorChain()
throws IOException {
String user = UserGroupInformation.getCurrentUser().getUserName();
RequestInterceptorChainWrapper chain = userPipelineMap.get(user);

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.secure;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
public abstract class AbstractSecureRouterTest {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSecureRouterTest.class);
////////////////////////////////
// Kerberos Constants
////////////////////////////////
public static final String REALM = "EXAMPLE.COM";
public static final String ROUTER = "router";
public static final String LOCALHOST = "localhost";
public static final String IP127001 = "127.0.0.1";
public static final String ROUTER_LOCALHOST = "router/" + LOCALHOST;
public static final String ROUTER_127001 = "router/" + IP127001;
public static final String ROUTER_REALM = "router@" + REALM;
public static final String ROUTER_LOCALHOST_REALM = ROUTER_LOCALHOST + "@" + REALM;
public static final String SUN_SECURITY_KRB5_DEBUG = "sun.security.krb5.debug";
public static final String KERBEROS = "kerberos";
////////////////////////////////
// BeforeSecureRouterTestClass Init
////////////////////////////////
private static MiniKdc kdc;
private static File routerKeytab;
private static File kdcWorkDir;
private static Configuration conf;
////////////////////////////////
// Specific Constant
// Like Mem, VCore, ClusterNum
////////////////////////////////
private static final int NUM_SUBCLUSTER = 4;
private static final int GB = 1024;
private static final int NM_MEMORY = 8 * GB;
private static final int NM_VCORE = 4;
////////////////////////////////
// Test use in subclasses
////////////////////////////////
private Router router = null;
private static ConcurrentHashMap<SubClusterId, MockRM> mockRMs =
new ConcurrentHashMap<>();
@BeforeClass
public static void beforeSecureRouterTestClass() throws Exception {
// Sets up the KDC and Principals.
setupKDCAndPrincipals();
// Init YarnConfiguration
conf = new YarnConfiguration();
// Enable Kerberos authentication configuration
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, KERBEROS);
// Router configuration
conf.set(YarnConfiguration.ROUTER_BIND_HOST, "0.0.0.0");
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
FederationClientInterceptor.class.getName());
// Router Kerberos KeyTab configuration
conf.set(YarnConfiguration.ROUTER_PRINCIPAL, ROUTER_LOCALHOST_REALM);
conf.set(YarnConfiguration.ROUTER_KEYTAB, routerKeytab.getAbsolutePath());
}
/**
* Sets up the KDC and Principals.
*
* @throws Exception an error occurred.
*/
public static void setupKDCAndPrincipals() throws Exception {
// set up the KDC
File target = new File(System.getProperty("test.dir", "target"));
kdcWorkDir = new File(target, "kdc");
kdcWorkDir.mkdirs();
if (!kdcWorkDir.mkdirs()) {
assertTrue(kdcWorkDir.isDirectory());
}
Properties kdcConf = MiniKdc.createConf();
kdcConf.setProperty(MiniKdc.DEBUG, "true");
kdc = new MiniKdc(kdcConf, kdcWorkDir);
kdc.start();
routerKeytab = createKeytab(ROUTER, "router.keytab");
}
/**
* Initialize RM in safe mode.
*
* @throws Exception an error occurred.
*/
public static void setupSecureMockRM() throws Exception {
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
SubClusterId sc = SubClusterId.newInstance(i);
if (mockRMs.containsKey(sc)) {
continue;
}
MockRM mockRM = new TestRMRestart.TestSecurityMockRM(conf);
mockRM.start();
mockRM.registerNode("127.0.0.1:1234", NM_MEMORY, NM_VCORE);
mockRMs.put(sc, mockRM);
}
}
/**
* Create the keytab for the given principal, includes
* raw principal and $principal/localhost.
*
* @param principal principal short name.
* @param filename filename of keytab.
* @return file of keytab.
* @throws Exception an error occurred.
*/
public static File createKeytab(String principal, String filename) throws Exception {
assertTrue("empty principal", StringUtils.isNotBlank(principal));
assertTrue("empty host", StringUtils.isNotBlank(filename));
assertNotNull("null KDC", kdc);
File keytab = new File(kdcWorkDir, filename);
kdc.createPrincipal(keytab,
principal,
principal + "/localhost",
principal + "/127.0.0.1");
return keytab;
}
/**
* Start the router in safe mode.
*
* @throws Exception an error occurred.
*/
public synchronized void startSecureRouter() {
assertNull("Router is already running", router);
UserGroupInformation.setConfiguration(conf);
router = new Router();
router.init(conf);
router.start();
}
/**
* Shut down the KDC service.
*
* @throws Exception an error occurred.
*/
public static void teardownKDC() throws Exception {
if (kdc != null) {
kdc.stop();
kdc = null;
}
}
/**
* Stop the router in safe mode.
*
* @throws Exception an error occurred.
*/
protected synchronized void stopSecureRouter() throws Exception {
if (router != null) {
router.stop();
router = null;
}
}
/**
* Stop the entire test service.
*
* @throws Exception an error occurred.
*/
@AfterClass
public static void afterSecureRouterTest() throws Exception {
LOG.info("teardown of kdc instance.");
teardownKDC();
}
public static MiniKdc getKdc() {
return kdc;
}
public Router getRouter() {
return router;
}
public static ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
return mockRMs;
}
}

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.secure;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.rmadmin.DefaultRMAdminRequestInterceptor;
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
public class TestSecureLogins extends AbstractSecureRouterTest {
private static final Logger LOG = LoggerFactory.getLogger(TestSecureLogins.class);
@Test
public void testHasRealm() throws Throwable {
Assert.assertNotNull(getRealm());
LOG.info("Router principal = {}", getPrincipalAndRealm(ROUTER_LOCALHOST));
}
@Test
public void testRouterSecureLogin() throws Exception {
startSecureRouter();
List<Service> services = this.getRouter().getServices();
Assert.assertNotNull(services);
Assert.assertEquals(3, services.size());
stopSecureRouter();
}
@Test
public void testRouterClientRMService() throws Exception {
// Start the Router in Secure Mode
startSecureRouter();
// Start RM and RouterClientRMService in Secure mode
setupSecureMockRM();
initRouterClientRMService();
// Test the simple rpc call of the Router in the Secure environment
RouterClientRMService routerClientRMService = this.getRouter().getClientRMProxyService();
GetClusterMetricsRequest metricsRequest = GetClusterMetricsRequest.newInstance();
GetClusterMetricsResponse metricsResponse =
routerClientRMService.getClusterMetrics(metricsRequest);
Assert.assertNotNull(metricsResponse);
YarnClusterMetrics clusterMetrics = metricsResponse.getClusterMetrics();
Assert.assertEquals(4, clusterMetrics.getNumNodeManagers());
Assert.assertEquals(0, clusterMetrics.getNumLostNodeManagers());
// Stop the Router in Secure Mode
stopSecureRouter();
}
@Test
public void testRouterRMAdminService() throws Exception {
// Start the Router in Secure Mode
startSecureRouter();
// Start RM and RouterClientRMService in Secure mode
setupSecureMockRM();
initRouterRMAdminService();
// Test the simple rpc call of the Router in the Secure environment
RouterRMAdminService routerRMAdminService = this.getRouter().getRmAdminProxyService();
RefreshNodesRequest refreshNodesRequest = RefreshNodesRequest.newInstance();
RefreshNodesResponse refreshNodesResponse =
routerRMAdminService.refreshNodes(refreshNodesRequest);
Assert.assertNotNull(refreshNodesResponse);
// Stop the Router in Secure Mode
stopSecureRouter();
}
public static String getPrincipalAndRealm(String principal) {
return principal + "@" + getRealm();
}
protected static String getRealm() {
return getKdc().getRealm();
}
private void initRouterClientRMService() throws Exception {
Router router = this.getRouter();
Map<SubClusterId, MockRM> mockRMs = getMockRMs();
RouterClientRMService rmService = router.getClientRMProxyService();
RouterClientRMService.RequestInterceptorChainWrapper wrapper = rmService.getInterceptorChain();
FederationClientInterceptor interceptor =
(FederationClientInterceptor) wrapper.getRootInterceptor();
FederationStateStoreFacade stateStoreFacade = interceptor.getFederationFacade();
FederationStateStore stateStore = stateStoreFacade.getStateStore();
FederationStateStoreTestUtil stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
Map<SubClusterId, ApplicationClientProtocol> clientRMProxies = interceptor.getClientRMProxies();
if (MapUtils.isNotEmpty(mockRMs)) {
for (Map.Entry<SubClusterId, MockRM> entry : mockRMs.entrySet()) {
SubClusterId sc = entry.getKey();
MockRM mockRM = entry.getValue();
stateStoreUtil.registerSubCluster(sc);
if (clientRMProxies.containsKey(sc)) {
continue;
}
clientRMProxies.put(sc, mockRM.getClientRMService());
}
}
}
private void initRouterRMAdminService() throws Exception {
Router router = this.getRouter();
Map<SubClusterId, MockRM> mockRMs = getMockRMs();
SubClusterId sc = SubClusterId.newInstance(0);
MockRM mockRM = mockRMs.get(sc);
RouterRMAdminService routerRMAdminService = router.getRmAdminProxyService();
RouterRMAdminService.RequestInterceptorChainWrapper rmAdminChainWrapper =
routerRMAdminService.getInterceptorChain();
DefaultRMAdminRequestInterceptor rmInterceptor =
(DefaultRMAdminRequestInterceptor) rmAdminChainWrapper.getRootInterceptor();
rmInterceptor.setRMAdmin(mockRM.getAdminService());
}
}

View File

@ -258,6 +258,16 @@ Optional:
|`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and this is the time to leave before the cache is invalidated. |
|`yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. |
Security:
Kerberos supported in federation.
| Property | Example | Description |
|:---- |:---- |
| `yarn.router.keytab.file` | | The keytab file used by router to login as its service principal. The principal name is configured with 'yarn.router.kerberos.principal'.|
| `yarn.router.kerberos.principal` | | The Router service principal. This is typically set to router/_HOST@REALM.TLD. Each Router will substitute _HOST with its own fully qualified hostname at startup. The _HOST placeholder allows using the same configuration setting on all Routers in setup. |
| `yarn.router.kerberos.principal.hostname` | | Optional. The hostname for the Router containing this configuration file. Will be different for each machine. Defaults to current hostname. |
###ON NMs:
These are extra configurations that should appear in the **conf/yarn-site.xml** at each NodeManager.