YARN-5647. [ATSv2 Security] Collector side changes for loading auth filters and principals. Contributed by Varun Saxena

This commit is contained in:
Jian He 2017-06-07 13:45:34 -07:00 committed by Varun Saxena
parent 24447b3626
commit 879de51206
15 changed files with 488 additions and 153 deletions

View File

@ -20,14 +20,14 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService;
@ -47,10 +47,9 @@
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
import org.apache.hadoop.yarn.server.timeline.security.TimelineV1DelegationTokenSecretManagerService;
import org.apache.hadoop.yarn.server.timeline.webapp.CrossOriginFilterInitializer;
import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@ -75,7 +74,7 @@ public class ApplicationHistoryServer extends CompositeService {
private ApplicationACLsManager aclsManager;
private ApplicationHistoryManager historyManager;
private TimelineStore timelineStore;
private TimelineDelegationTokenSecretManagerService secretManagerService;
private TimelineV1DelegationTokenSecretManagerService secretManagerService;
private TimelineDataManager timelineDataManager;
private WebApp webApp;
private JvmPauseMonitor pauseMonitor;
@ -223,9 +222,9 @@ private TimelineStore createTimelineStore(
TimelineStore.class), conf);
}
private TimelineDelegationTokenSecretManagerService
private TimelineV1DelegationTokenSecretManagerService
createTimelineDelegationTokenSecretManagerService(Configuration conf) {
return new TimelineDelegationTokenSecretManagerService();
return new TimelineV1DelegationTokenSecretManagerService();
}
private TimelineDataManager createTimelineDataManager(Configuration conf) {
@ -237,63 +236,33 @@ private TimelineDataManager createTimelineDataManager(Configuration conf) {
@SuppressWarnings("unchecked")
private void startWebApp() {
Configuration conf = getConfig();
TimelineAuthenticationFilter.setTimelineDelegationTokenSecretManager(
secretManagerService.getTimelineDelegationTokenSecretManager());
// Always load pseudo authentication filter to parse "user.name" in an URL
// to identify a HTTP request's user in insecure mode.
// When Kerberos authentication type is set (i.e., secure mode is turned on),
// the customized filter will be loaded by the timeline server to do Kerberos
// + DT authentication.
String initializers = conf.get("hadoop.http.filter.initializers");
boolean modifiedInitializers = false;
initializers =
initializers == null || initializers.length() == 0 ? "" : initializers;
String initializers = conf.get("hadoop.http.filter.initializers", "");
Set<String> defaultInitializers = new LinkedHashSet<String>();
// Add CORS filter
if (!initializers.contains(CrossOriginFilterInitializer.class.getName())) {
if(conf.getBoolean(YarnConfiguration
.TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED, YarnConfiguration
.TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT)) {
if (initializers.contains(HttpCrossOriginFilterInitializer.class.getName())) {
initializers =
initializers.replaceAll(HttpCrossOriginFilterInitializer.class.getName(),
if(conf.getBoolean(YarnConfiguration.
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED,
YarnConfiguration.
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT)) {
if (initializers.contains(
HttpCrossOriginFilterInitializer.class.getName())) {
initializers = initializers.replaceAll(
HttpCrossOriginFilterInitializer.class.getName(),
CrossOriginFilterInitializer.class.getName());
} else {
defaultInitializers.add(CrossOriginFilterInitializer.class.getName());
}
else {
if (initializers.length() != 0) {
initializers += ",";
}
initializers += CrossOriginFilterInitializer.class.getName();
}
modifiedInitializers = true;
}
}
if (!initializers.contains(TimelineAuthenticationFilterInitializer.class
.getName())) {
if (initializers.length() != 0) {
initializers += ",";
}
initializers += TimelineAuthenticationFilterInitializer.class.getName();
modifiedInitializers = true;
}
String[] parts = initializers.split(",");
ArrayList<String> target = new ArrayList<String>();
for (String filterInitializer : parts) {
filterInitializer = filterInitializer.trim();
if (filterInitializer.equals(AuthenticationFilterInitializer.class
.getName())) {
modifiedInitializers = true;
continue;
}
target.add(filterInitializer);
}
String actualInitializers =
org.apache.commons.lang.StringUtils.join(target, ",");
if (modifiedInitializers) {
conf.set("hadoop.http.filter.initializers", actualInitializers);
}
TimelineServerUtils.addTimelineAuthFilter(
initializers, defaultInitializers, secretManagerService);
TimelineServerUtils.setTimelineFilters(
conf, initializers, defaultInitializers);
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
@ -37,18 +36,16 @@
import org.slf4j.LoggerFactory;
/**
* The service wrapper of {@link TimelineDelegationTokenSecretManager}
* The service wrapper of {@link TimelineV1DelegationTokenSecretManager}.
*/
@Private
@Unstable
public class TimelineDelegationTokenSecretManagerService extends
AbstractService {
private TimelineDelegationTokenSecretManager secretManager = null;
public class TimelineV1DelegationTokenSecretManagerService extends
TimelineDelgationTokenSecretManagerService {
private TimelineStateStore stateStore = null;
public TimelineDelegationTokenSecretManagerService() {
super(TimelineDelegationTokenSecretManagerService.class.getName());
public TimelineV1DelegationTokenSecretManagerService() {
super(TimelineV1DelegationTokenSecretManagerService.class.getName());
}
@Override
@ -58,19 +55,7 @@ protected void serviceInit(Configuration conf) throws Exception {
stateStore = createStateStore(conf);
stateStore.init(conf);
}
long secretKeyInterval =
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_KEY_UPDATE_INTERVAL,
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_KEY_UPDATE_INTERVAL);
long tokenMaxLifetime =
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME,
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME);
long tokenRenewInterval =
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL,
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL);
secretManager = new TimelineDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000, stateStore);
super.init(conf);
super.serviceInit(conf);
}
@Override
@ -78,10 +63,9 @@ protected void serviceStart() throws Exception {
if (stateStore != null) {
stateStore.start();
TimelineServiceState state = stateStore.loadState();
secretManager.recover(state);
((TimelineV1DelegationTokenSecretManager)
getTimelineDelegationTokenSecretManager()).recover(state);
}
secretManager.startThreads();
super.serviceStart();
}
@ -90,9 +74,18 @@ protected void serviceStop() throws Exception {
if (stateStore != null) {
stateStore.stop();
}
super.serviceStop();
}
secretManager.stopThreads();
super.stop();
@Override
protected AbstractDelegationTokenSecretManager
<TimelineDelegationTokenIdentifier>
createTimelineDelegationTokenSecretManager(long secretKeyInterval,
long tokenMaxLifetime, long tokenRenewInterval,
long tokenRemovalScanInterval) {
return new TimelineV1DelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval,
stateStore);
}
protected TimelineStateStore createStateStore(
@ -104,27 +97,20 @@ protected TimelineStateStore createStateStore(
}
/**
* Ge the instance of {link #TimelineDelegationTokenSecretManager}
*
* @return the instance of {link #TimelineDelegationTokenSecretManager}
* Delegation token secret manager for ATSv1 and ATSv1.5.
*/
public TimelineDelegationTokenSecretManager
getTimelineDelegationTokenSecretManager() {
return secretManager;
}
@Private
@Unstable
public static class TimelineDelegationTokenSecretManager extends
public static class TimelineV1DelegationTokenSecretManager extends
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
public static final Logger LOG =
LoggerFactory.getLogger(TimelineDelegationTokenSecretManager.class);
LoggerFactory.getLogger(TimelineV1DelegationTokenSecretManager.class);
private TimelineStateStore stateStore;
/**
* Create a timeline secret manager
* Create a timeline v1 secret manager.
* @param delegationKeyUpdateInterval the number of milliseconds for rolling
* new secret keys.
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation
@ -135,7 +121,7 @@ public static class TimelineDelegationTokenSecretManager extends
* scanned for expired tokens in milliseconds
* @param stateStore timeline service state store
*/
public TimelineDelegationTokenSecretManager(
public TimelineV1DelegationTokenSecretManager(
long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
@ -236,5 +222,4 @@ public void recover(TimelineServiceState state) throws IOException {
}
}
}
}

View File

@ -55,27 +55,31 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Test cases for authentication via TimelineAuthenticationFilter while
* publishing entities for ATSv1.
*/
@RunWith(Parameterized.class)
public class TestTimelineAuthenticationFilter {
public class TestTimelineAuthenticationFilterForV1 {
private static final String FOO_USER = "foo";
private static final String BAR_USER = "bar";
private static final String HTTP_USER = "HTTP";
private static final File testRootDir = new File(
private static final File TEST_ROOT_DIR = new File(
System.getProperty("test.build.dir", "target/test-dir"),
TestTimelineAuthenticationFilter.class.getName() + "-root");
TestTimelineAuthenticationFilterForV1.class.getName() + "-root");
private static File httpSpnegoKeytabFile = new File(
KerberosTestUtils.getKeytabFile());
private static String httpSpnegoPrincipal =
KerberosTestUtils.getServerPrincipal();
private static final String BASEDIR =
System.getProperty("test.build.dir", "target/test-dir") + "/"
+ TestTimelineAuthenticationFilter.class.getSimpleName();
+ TestTimelineAuthenticationFilterForV1.class.getSimpleName();
@Parameterized.Parameters
public static Collection<Object[]> withSsl() {
return Arrays.asList(new Object[][] { { false }, { true } });
return Arrays.asList(new Object[][] {{false}, {true}});
}
private static MiniKdc testMiniKDC;
@ -85,14 +89,14 @@ public static Collection<Object[]> withSsl() {
private static Configuration conf;
private static boolean withSsl;
public TestTimelineAuthenticationFilter(boolean withSsl) {
TestTimelineAuthenticationFilter.withSsl = withSsl;
public TestTimelineAuthenticationFilterForV1(boolean withSsl) {
TestTimelineAuthenticationFilterForV1.withSsl = withSsl;
}
@BeforeClass
public static void setup() {
try {
testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir);
testMiniKDC = new MiniKdc(MiniKdc.createConf(), TEST_ROOT_DIR);
testMiniKDC.start();
testMiniKDC.createPrincipal(
httpSpnegoKeytabFile, HTTP_USER + "/localhost");
@ -111,11 +115,11 @@ public static void setup() {
KerberosAuthenticationHandler.KEYTAB,
httpSpnegoKeytabFile.getAbsolutePath());
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
"kerberos");
conf.set(YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL,
httpSpnegoPrincipal);
httpSpnegoPrincipal);
conf.set(YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
httpSpnegoKeytabFile.getAbsolutePath());
httpSpnegoKeytabFile.getAbsolutePath());
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class);
@ -136,8 +140,8 @@ public static void setup() {
FileUtil.fullyDelete(base);
base.mkdirs();
keystoresDir = new File(BASEDIR).getAbsolutePath();
sslConfDir =
KeyStoreTestUtil.getClasspathDir(TestTimelineAuthenticationFilter.class);
sslConfDir = KeyStoreTestUtil.getClasspathDir(
TestTimelineAuthenticationFilterForV1.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
}
@ -145,6 +149,7 @@ public static void setup() {
testTimelineServer.init(conf);
testTimelineServer.start();
} catch (Exception e) {
e.printStackTrace();
assertTrue("Couldn't setup TimelineServer", false);
}
}
@ -181,14 +186,14 @@ public Void call() throws Exception {
TimelineClient client = createTimelineClientForUGI();
TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType(
TestTimelineAuthenticationFilter.class.getName());
TestTimelineAuthenticationFilterForV1.class.getName());
entityToStore.setEntityId("entity1");
entityToStore.setStartTime(0L);
TimelinePutResponse putResponse = client.putEntities(entityToStore);
Assert.assertEquals(0, putResponse.getErrors().size());
TimelineEntity entityToRead =
testTimelineServer.getTimelineStore().getEntity(
"entity1", TestTimelineAuthenticationFilter.class.getName(), null);
testTimelineServer.getTimelineStore().getEntity("entity1",
TestTimelineAuthenticationFilterForV1.class.getName(), null);
Assert.assertNotNull(entityToRead);
return null;
}
@ -202,13 +207,14 @@ public void testPutDomains() throws Exception {
public Void call() throws Exception {
TimelineClient client = createTimelineClientForUGI();
TimelineDomain domainToStore = new TimelineDomain();
domainToStore.setId(TestTimelineAuthenticationFilter.class.getName());
domainToStore.setId(
TestTimelineAuthenticationFilterForV1.class.getName());
domainToStore.setReaders("*");
domainToStore.setWriters("*");
client.putDomain(domainToStore);
TimelineDomain domainToRead =
testTimelineServer.getTimelineStore().getDomain(
TestTimelineAuthenticationFilter.class.getName());
TestTimelineAuthenticationFilterForV1.class.getName());
Assert.assertNotNull(domainToRead);
return null;
}
@ -218,22 +224,24 @@ public Void call() throws Exception {
@Test
public void testDelegationTokenOperations() throws Exception {
TimelineClient httpUserClient =
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<TimelineClient>() {
@Override
public TimelineClient call() throws Exception {
return createTimelineClientForUGI();
}
});
KerberosTestUtils.doAs(HTTP_USER + "/localhost",
new Callable<TimelineClient>() {
@Override
public TimelineClient call() throws Exception {
return createTimelineClientForUGI();
}
});
UserGroupInformation httpUser =
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<UserGroupInformation>() {
@Override
public UserGroupInformation call() throws Exception {
return UserGroupInformation.getCurrentUser();
}
});
KerberosTestUtils.doAs(HTTP_USER + "/localhost",
new Callable<UserGroupInformation>() {
@Override
public UserGroupInformation call() throws Exception {
return UserGroupInformation.getCurrentUser();
}
});
// Let HTTP user to get the delegation for itself
Token<TimelineDelegationTokenIdentifier> token =
httpUserClient.getDelegationToken(httpUser.getShortUserName());
httpUserClient.getDelegationToken(httpUser.getShortUserName());
Assert.assertNotNull(token);
TimelineDelegationTokenIdentifier tDT = token.decodeIdentifier();
Assert.assertNotNull(tDT);
@ -317,7 +325,8 @@ public TimelineClient run() {
barUserClient.getDelegationToken(httpUser.getShortUserName());
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof AuthorizationException || e.getCause() instanceof AuthenticationException);
Assert.assertTrue(e.getCause() instanceof AuthorizationException ||
e.getCause() instanceof AuthenticationException);
}
}
}

View File

@ -23,27 +23,33 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter;
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService.TimelineDelegationTokenSecretManager;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
/**
* Timeline authentication filter provides delegation token support for ATSv1
* and ATSv2.
*/
@Private
@Unstable
public class TimelineAuthenticationFilter
extends DelegationTokenAuthenticationFilter {
private static TimelineDelegationTokenSecretManager secretManager;
private static AbstractDelegationTokenSecretManager
<TimelineDelegationTokenIdentifier> secretManager;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
filterConfig.getServletContext().setAttribute(
DelegationTokenAuthenticationFilter.DELEGATION_TOKEN_SECRET_MANAGER_ATTR,
secretManager);
DelegationTokenAuthenticationFilter.
DELEGATION_TOKEN_SECRET_MANAGER_ATTR, secretManager);
super.init(filterConfig);
}
public static void setTimelineDelegationTokenSecretManager(
TimelineDelegationTokenSecretManager secretManager) {
TimelineAuthenticationFilter.secretManager = secretManager;
AbstractDelegationTokenSecretManager
<TimelineDelegationTokenIdentifier> secretMgr) {
TimelineAuthenticationFilter.secretManager = secretMgr;
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
/**
* Abstract implementation of delegation token manager service for different
* versions of timeline service.
*/
public abstract class TimelineDelgationTokenSecretManagerService extends
AbstractService {
public TimelineDelgationTokenSecretManagerService(String name) {
super(name);
}
private static long delegationTokenRemovalScanInterval = 3600000L;
private AbstractDelegationTokenSecretManager
<TimelineDelegationTokenIdentifier> secretManager = null;
@Override
protected void serviceInit(Configuration conf) throws Exception {
long secretKeyInterval =
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_KEY_UPDATE_INTERVAL,
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_KEY_UPDATE_INTERVAL);
long tokenMaxLifetime =
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME,
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME);
long tokenRenewInterval =
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL,
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL);
secretManager = createTimelineDelegationTokenSecretManager(
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval,
delegationTokenRemovalScanInterval);
super.init(conf);
}
protected abstract
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier>
createTimelineDelegationTokenSecretManager(long secretKeyInterval,
long tokenMaxLifetime, long tokenRenewInterval,
long tokenRemovalScanInterval);
@Override
protected void serviceStart() throws Exception {
secretManager.startThreads();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
secretManager.stopThreads();
super.stop();
}
public AbstractDelegationTokenSecretManager
<TimelineDelegationTokenIdentifier>
getTimelineDelegationTokenSecretManager() {
return secretManager;
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.server.timeline.security contains classes related
* to timeline authentication filters and abstract delegation token service for
* ATSv1 and ATSv2.
*/
@InterfaceAudience.Private
package org.apache.hadoop.yarn.server.timeline.security;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.util.timeline;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService;
/**
* Set of utility methods to be used across timeline reader and collector.
*/
public final class TimelineServerUtils {
private static final Log LOG = LogFactory.getLog(TimelineServerUtils.class);
private TimelineServerUtils() {
}
/**
* Sets filter initializers configuration based on existing configuration and
* default filters added by timeline service(such as timeline auth filter and
* CORS filter).
* @param conf Configuration object.
* @param configuredInitializers Comma separated list of filter initializers.
* @param defaultInitializers Set of initializers added by default by timeline
* service.
*/
public static void setTimelineFilters(Configuration conf,
String configuredInitializers, Set<String> defaultInitializers) {
String[] parts = configuredInitializers.split(",");
Set<String> target = new LinkedHashSet<String>();
for (String filterInitializer : parts) {
filterInitializer = filterInitializer.trim();
if (filterInitializer.equals(
AuthenticationFilterInitializer.class.getName()) ||
filterInitializer.isEmpty()) {
continue;
}
target.add(filterInitializer);
}
target.addAll(defaultInitializers);
String actualInitializers =
org.apache.commons.lang.StringUtils.join(target, ",");
LOG.info("Filter initializers set for timeline service: " +
actualInitializers);
conf.set("hadoop.http.filter.initializers", actualInitializers);
}
/**
* Adds timeline authentication filter to the set of default filter
* initializers and assigns the delegation token manager service to it.
* @param initializers Comma separated list of filter initializers.
* @param defaultInitializers Set of initializers added by default by timeline
* service.
* @param delegationTokenMgrService Delegation token manager service.
* This will be used by timeline authentication filter to assign
* delegation tokens.
*/
public static void addTimelineAuthFilter(String initializers,
Set<String> defaultInitializers,
TimelineDelgationTokenSecretManagerService delegationTokenMgrService) {
TimelineAuthenticationFilter.setTimelineDelegationTokenSecretManager(
delegationTokenMgrService.getTimelineDelegationTokenSecretManager());
if (!initializers.contains(
TimelineAuthenticationFilterInitializer.class.getName())) {
defaultInitializers.add(
TimelineAuthenticationFilterInitializer.class.getName());
}
}
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.server.util.timeline contains utility classes used
* by ATSv1 and ATSv2 on the server side.
*/
@InterfaceAudience.Private
package org.apache.hadoop.yarn.server.util.timeline;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -18,19 +18,19 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -40,6 +40,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.timelineservice.security.TimelineV2DelegationTokenSecretManagerService;
import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@ -65,17 +67,51 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
private volatile CollectorNodemanagerProtocol nmCollectorService;
private TimelineV2DelegationTokenSecretManagerService tokenMgrService;
private final boolean runningAsAuxService;
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
@VisibleForTesting
protected NodeTimelineCollectorManager() {
this(true);
}
protected NodeTimelineCollectorManager(boolean asAuxService) {
super(NodeTimelineCollectorManager.class.getName());
this.runningAsAuxService = asAuxService;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
tokenMgrService = new TimelineV2DelegationTokenSecretManagerService();
addService(tokenMgrService);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
startWebApp();
if (UserGroupInformation.isSecurityEnabled() && !runningAsAuxService) {
// Do security login for cases where collector is running outside NM.
try {
doSecureLogin();
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie);
}
}
super.serviceStart();
startWebApp();
}
private void doSecureLogin() throws IOException {
Configuration conf = getConfig();
InetSocketAddress addr = NetUtils.createSocketAddr(conf.getTrimmed(
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST), 0,
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST);
SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, addr.getHostName());
}
@Override
@ -105,6 +141,12 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
*/
private void startWebApp() {
Configuration conf = getConfig();
String initializers = conf.get("hadoop.http.filter.initializers", "");
Set<String> defaultInitializers = new LinkedHashSet<String>();
TimelineServerUtils.addTimelineAuthFilter(
initializers, defaultInitializers, tokenMgrService);
TimelineServerUtils.setTimelineFilters(
conf, initializers, defaultInitializers);
String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
try {
@ -114,16 +156,10 @@ private void startWebApp() {
.addEndpoint(URI.create(
(YarnConfiguration.useHttps(conf) ? "https://" : "http://") +
bindAddress));
if (YarnConfiguration.useHttps(conf)) {
builder = WebAppUtils.loadSslConfiguration(builder, conf);
}
timelineRestServer = builder.build();
// TODO: replace this by an authentication filter in future.
HashMap<String, String> options = new HashMap<>();
String username = conf.get(HADOOP_HTTP_STATIC_USER,
DEFAULT_HADOOP_HTTP_STATIC_USER);
options.put(HADOOP_HTTP_STATIC_USER, username);
HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
"static_user_filter_timeline",
StaticUserWebFilter.StaticUserFilter.class.getName(),
options, new String[] {"/*"});
timelineRestServer.addJerseyResourcePackage(
TimelineCollectorWebService.class.getPackage().getName() + ";"

View File

@ -61,7 +61,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
private ScheduledExecutorService scheduler;
public PerNodeTimelineCollectorsAuxService() {
this(new NodeTimelineCollectorManager());
this(new NodeTimelineCollectorManager(true));
}
@VisibleForTesting PerNodeTimelineCollectorsAuxService(
@ -202,7 +202,8 @@ public ByteBuffer getMetaData() {
PerNodeTimelineCollectorsAuxService auxService = null;
try {
auxService = collectorManager == null ?
new PerNodeTimelineCollectorsAuxService() :
new PerNodeTimelineCollectorsAuxService(
new NodeTimelineCollectorManager(false)) :
new PerNodeTimelineCollectorsAuxService(collectorManager);
ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
SHUTDOWN_HOOK_PRIORITY);

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -47,7 +47,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TimelineCollectorManager extends AbstractService {
public class TimelineCollectorManager extends CompositeService {
private static final Logger LOG =
LoggerFactory.getLogger(TimelineCollectorManager.class);
@ -57,7 +57,7 @@ public class TimelineCollectorManager extends AbstractService {
private boolean writerFlusherRunning;
@Override
public void serviceInit(Configuration conf) throws Exception {
protected void serviceInit(Configuration conf) throws Exception {
writer = createTimelineWriter(conf);
writer.init(conf);
// create a single dedicated thread for flushing the writer on a periodic

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.security;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService;
/**
* The service wrapper of {@link TimelineV2DelegationTokenSecretManager}.
*/
public class TimelineV2DelegationTokenSecretManagerService extends
TimelineDelgationTokenSecretManagerService {
public TimelineV2DelegationTokenSecretManagerService() {
super(TimelineV2DelegationTokenSecretManagerService.class.getName());
}
@Override
protected AbstractDelegationTokenSecretManager
<TimelineDelegationTokenIdentifier>
createTimelineDelegationTokenSecretManager(long secretKeyInterval,
long tokenMaxLifetime, long tokenRenewInterval,
long tokenRemovalScanInterval) {
return new TimelineV2DelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval);
}
/**
* Delegation token secret manager for ATSv2.
*/
@Private
@Unstable
public static class TimelineV2DelegationTokenSecretManager extends
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
/**
* Create a timeline v2 secret manager.
* @param delegationKeyUpdateInterval the number of milliseconds for rolling
* new secret keys.
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation
* tokens in milliseconds
* @param delegationTokenRenewInterval how often the tokens must be renewed
* in milliseconds
* @param delegationTokenRemoverScanInterval how often the tokens are
* scanned for expired tokens in milliseconds
*/
public TimelineV2DelegationTokenSecretManager(
long delegationKeyUpdateInterval, long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
}
@Override
public TimelineDelegationTokenIdentifier createIdentifier() {
return new TimelineDelegationTokenIdentifier();
}
}
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.server.timelineservice.security contains classes
* to be used to generate delegation tokens for ATSv2.
*/
@InterfaceAudience.Private
package org.apache.hadoop.yarn.server.timelineservice.security;
import org.apache.hadoop.classification.InterfaceAudience;