YARN-5647. [ATSv2 Security] Collector side changes for loading auth filters and principals. Contributed by Varun Saxena
(cherry picked from commit 79dae624793164cd4692c86992a511310145858a)
This commit is contained in:
parent
c8db08dfac
commit
9577900a91
|
@ -20,14 +20,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.http.HttpServer2;
|
import org.apache.hadoop.http.HttpServer2;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
import org.apache.hadoop.security.AuthenticationFilterInitializer;
|
|
||||||
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
|
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.service.CompositeService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
|
@ -47,10 +47,9 @@ import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineV1DelegationTokenSecretManagerService;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
|
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
|
|
||||||
import org.apache.hadoop.yarn.server.timeline.webapp.CrossOriginFilterInitializer;
|
import org.apache.hadoop.yarn.server.timeline.webapp.CrossOriginFilterInitializer;
|
||||||
|
import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.WebApp;
|
import org.apache.hadoop.yarn.webapp.WebApp;
|
||||||
import org.apache.hadoop.yarn.webapp.WebApps;
|
import org.apache.hadoop.yarn.webapp.WebApps;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
@ -75,7 +74,7 @@ public class ApplicationHistoryServer extends CompositeService {
|
||||||
private ApplicationACLsManager aclsManager;
|
private ApplicationACLsManager aclsManager;
|
||||||
private ApplicationHistoryManager historyManager;
|
private ApplicationHistoryManager historyManager;
|
||||||
private TimelineStore timelineStore;
|
private TimelineStore timelineStore;
|
||||||
private TimelineDelegationTokenSecretManagerService secretManagerService;
|
private TimelineV1DelegationTokenSecretManagerService secretManagerService;
|
||||||
private TimelineDataManager timelineDataManager;
|
private TimelineDataManager timelineDataManager;
|
||||||
private WebApp webApp;
|
private WebApp webApp;
|
||||||
private JvmPauseMonitor pauseMonitor;
|
private JvmPauseMonitor pauseMonitor;
|
||||||
|
@ -223,9 +222,9 @@ public class ApplicationHistoryServer extends CompositeService {
|
||||||
TimelineStore.class), conf);
|
TimelineStore.class), conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
private TimelineDelegationTokenSecretManagerService
|
private TimelineV1DelegationTokenSecretManagerService
|
||||||
createTimelineDelegationTokenSecretManagerService(Configuration conf) {
|
createTimelineDelegationTokenSecretManagerService(Configuration conf) {
|
||||||
return new TimelineDelegationTokenSecretManagerService();
|
return new TimelineV1DelegationTokenSecretManagerService();
|
||||||
}
|
}
|
||||||
|
|
||||||
private TimelineDataManager createTimelineDataManager(Configuration conf) {
|
private TimelineDataManager createTimelineDataManager(Configuration conf) {
|
||||||
|
@ -237,63 +236,33 @@ public class ApplicationHistoryServer extends CompositeService {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void startWebApp() {
|
private void startWebApp() {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
TimelineAuthenticationFilter.setTimelineDelegationTokenSecretManager(
|
|
||||||
secretManagerService.getTimelineDelegationTokenSecretManager());
|
|
||||||
// Always load pseudo authentication filter to parse "user.name" in an URL
|
// Always load pseudo authentication filter to parse "user.name" in an URL
|
||||||
// to identify a HTTP request's user in insecure mode.
|
// to identify a HTTP request's user in insecure mode.
|
||||||
// When Kerberos authentication type is set (i.e., secure mode is turned on),
|
// When Kerberos authentication type is set (i.e., secure mode is turned on),
|
||||||
// the customized filter will be loaded by the timeline server to do Kerberos
|
// the customized filter will be loaded by the timeline server to do Kerberos
|
||||||
// + DT authentication.
|
// + DT authentication.
|
||||||
String initializers = conf.get("hadoop.http.filter.initializers");
|
String initializers = conf.get("hadoop.http.filter.initializers", "");
|
||||||
boolean modifiedInitializers = false;
|
Set<String> defaultInitializers = new LinkedHashSet<String>();
|
||||||
|
// Add CORS filter
|
||||||
initializers =
|
|
||||||
initializers == null || initializers.length() == 0 ? "" : initializers;
|
|
||||||
|
|
||||||
if (!initializers.contains(CrossOriginFilterInitializer.class.getName())) {
|
if (!initializers.contains(CrossOriginFilterInitializer.class.getName())) {
|
||||||
if(conf.getBoolean(YarnConfiguration
|
if(conf.getBoolean(YarnConfiguration.
|
||||||
.TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED, YarnConfiguration
|
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED,
|
||||||
.TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT)) {
|
YarnConfiguration.
|
||||||
if (initializers.contains(HttpCrossOriginFilterInitializer.class.getName())) {
|
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT)) {
|
||||||
initializers =
|
if (initializers.contains(
|
||||||
initializers.replaceAll(HttpCrossOriginFilterInitializer.class.getName(),
|
HttpCrossOriginFilterInitializer.class.getName())) {
|
||||||
|
initializers = initializers.replaceAll(
|
||||||
|
HttpCrossOriginFilterInitializer.class.getName(),
|
||||||
CrossOriginFilterInitializer.class.getName());
|
CrossOriginFilterInitializer.class.getName());
|
||||||
|
} else {
|
||||||
|
defaultInitializers.add(CrossOriginFilterInitializer.class.getName());
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
if (initializers.length() != 0) {
|
|
||||||
initializers += ",";
|
|
||||||
}
|
|
||||||
initializers += CrossOriginFilterInitializer.class.getName();
|
|
||||||
}
|
|
||||||
modifiedInitializers = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
TimelineServerUtils.addTimelineAuthFilter(
|
||||||
if (!initializers.contains(TimelineAuthenticationFilterInitializer.class
|
initializers, defaultInitializers, secretManagerService);
|
||||||
.getName())) {
|
TimelineServerUtils.setTimelineFilters(
|
||||||
if (initializers.length() != 0) {
|
conf, initializers, defaultInitializers);
|
||||||
initializers += ",";
|
|
||||||
}
|
|
||||||
initializers += TimelineAuthenticationFilterInitializer.class.getName();
|
|
||||||
modifiedInitializers = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
String[] parts = initializers.split(",");
|
|
||||||
ArrayList<String> target = new ArrayList<String>();
|
|
||||||
for (String filterInitializer : parts) {
|
|
||||||
filterInitializer = filterInitializer.trim();
|
|
||||||
if (filterInitializer.equals(AuthenticationFilterInitializer.class
|
|
||||||
.getName())) {
|
|
||||||
modifiedInitializers = true;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
target.add(filterInitializer);
|
|
||||||
}
|
|
||||||
String actualInitializers =
|
|
||||||
org.apache.commons.lang.StringUtils.join(target, ",");
|
|
||||||
if (modifiedInitializers) {
|
|
||||||
conf.set("hadoop.http.filter.initializers", actualInitializers);
|
|
||||||
}
|
|
||||||
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
|
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
|
||||||
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
|
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
|
||||||
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
|
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
|
@ -37,18 +36,16 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The service wrapper of {@link TimelineDelegationTokenSecretManager}
|
* The service wrapper of {@link TimelineV1DelegationTokenSecretManager}.
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class TimelineDelegationTokenSecretManagerService extends
|
public class TimelineV1DelegationTokenSecretManagerService extends
|
||||||
AbstractService {
|
TimelineDelgationTokenSecretManagerService {
|
||||||
|
|
||||||
private TimelineDelegationTokenSecretManager secretManager = null;
|
|
||||||
private TimelineStateStore stateStore = null;
|
private TimelineStateStore stateStore = null;
|
||||||
|
|
||||||
public TimelineDelegationTokenSecretManagerService() {
|
public TimelineV1DelegationTokenSecretManagerService() {
|
||||||
super(TimelineDelegationTokenSecretManagerService.class.getName());
|
super(TimelineV1DelegationTokenSecretManagerService.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -58,19 +55,7 @@ public class TimelineDelegationTokenSecretManagerService extends
|
||||||
stateStore = createStateStore(conf);
|
stateStore = createStateStore(conf);
|
||||||
stateStore.init(conf);
|
stateStore.init(conf);
|
||||||
}
|
}
|
||||||
|
super.serviceInit(conf);
|
||||||
long secretKeyInterval =
|
|
||||||
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_KEY_UPDATE_INTERVAL,
|
|
||||||
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_KEY_UPDATE_INTERVAL);
|
|
||||||
long tokenMaxLifetime =
|
|
||||||
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME,
|
|
||||||
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME);
|
|
||||||
long tokenRenewInterval =
|
|
||||||
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL,
|
|
||||||
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL);
|
|
||||||
secretManager = new TimelineDelegationTokenSecretManager(secretKeyInterval,
|
|
||||||
tokenMaxLifetime, tokenRenewInterval, 3600000, stateStore);
|
|
||||||
super.init(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -78,10 +63,9 @@ public class TimelineDelegationTokenSecretManagerService extends
|
||||||
if (stateStore != null) {
|
if (stateStore != null) {
|
||||||
stateStore.start();
|
stateStore.start();
|
||||||
TimelineServiceState state = stateStore.loadState();
|
TimelineServiceState state = stateStore.loadState();
|
||||||
secretManager.recover(state);
|
((TimelineV1DelegationTokenSecretManager)
|
||||||
|
getTimelineDelegationTokenSecretManager()).recover(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
secretManager.startThreads();
|
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,9 +74,18 @@ public class TimelineDelegationTokenSecretManagerService extends
|
||||||
if (stateStore != null) {
|
if (stateStore != null) {
|
||||||
stateStore.stop();
|
stateStore.stop();
|
||||||
}
|
}
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
secretManager.stopThreads();
|
@Override
|
||||||
super.stop();
|
protected AbstractDelegationTokenSecretManager
|
||||||
|
<TimelineDelegationTokenIdentifier>
|
||||||
|
createTimelineDelegationTokenSecretManager(long secretKeyInterval,
|
||||||
|
long tokenMaxLifetime, long tokenRenewInterval,
|
||||||
|
long tokenRemovalScanInterval) {
|
||||||
|
return new TimelineV1DelegationTokenSecretManager(secretKeyInterval,
|
||||||
|
tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval,
|
||||||
|
stateStore);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TimelineStateStore createStateStore(
|
protected TimelineStateStore createStateStore(
|
||||||
|
@ -104,27 +97,20 @@ public class TimelineDelegationTokenSecretManagerService extends
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ge the instance of {link #TimelineDelegationTokenSecretManager}
|
* Delegation token secret manager for ATSv1 and ATSv1.5.
|
||||||
*
|
|
||||||
* @return the instance of {link #TimelineDelegationTokenSecretManager}
|
|
||||||
*/
|
*/
|
||||||
public TimelineDelegationTokenSecretManager
|
|
||||||
getTimelineDelegationTokenSecretManager() {
|
|
||||||
return secretManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public static class TimelineDelegationTokenSecretManager extends
|
public static class TimelineV1DelegationTokenSecretManager extends
|
||||||
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
|
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
|
||||||
|
|
||||||
public static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TimelineDelegationTokenSecretManager.class);
|
LoggerFactory.getLogger(TimelineV1DelegationTokenSecretManager.class);
|
||||||
|
|
||||||
private TimelineStateStore stateStore;
|
private TimelineStateStore stateStore;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a timeline secret manager
|
* Create a timeline v1 secret manager.
|
||||||
* @param delegationKeyUpdateInterval the number of milliseconds for rolling
|
* @param delegationKeyUpdateInterval the number of milliseconds for rolling
|
||||||
* new secret keys.
|
* new secret keys.
|
||||||
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation
|
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation
|
||||||
|
@ -135,7 +121,7 @@ public class TimelineDelegationTokenSecretManagerService extends
|
||||||
* scanned for expired tokens in milliseconds
|
* scanned for expired tokens in milliseconds
|
||||||
* @param stateStore timeline service state store
|
* @param stateStore timeline service state store
|
||||||
*/
|
*/
|
||||||
public TimelineDelegationTokenSecretManager(
|
public TimelineV1DelegationTokenSecretManager(
|
||||||
long delegationKeyUpdateInterval,
|
long delegationKeyUpdateInterval,
|
||||||
long delegationTokenMaxLifetime,
|
long delegationTokenMaxLifetime,
|
||||||
long delegationTokenRenewInterval,
|
long delegationTokenRenewInterval,
|
||||||
|
@ -236,5 +222,4 @@ public class TimelineDelegationTokenSecretManagerService extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -55,27 +55,31 @@ import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test cases for authentication via TimelineAuthenticationFilter while
|
||||||
|
* publishing entities for ATSv1.
|
||||||
|
*/
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class TestTimelineAuthenticationFilter {
|
public class TestTimelineAuthenticationFilterForV1 {
|
||||||
|
|
||||||
private static final String FOO_USER = "foo";
|
private static final String FOO_USER = "foo";
|
||||||
private static final String BAR_USER = "bar";
|
private static final String BAR_USER = "bar";
|
||||||
private static final String HTTP_USER = "HTTP";
|
private static final String HTTP_USER = "HTTP";
|
||||||
|
|
||||||
private static final File testRootDir = new File(
|
private static final File TEST_ROOT_DIR = new File(
|
||||||
System.getProperty("test.build.dir", "target/test-dir"),
|
System.getProperty("test.build.dir", "target/test-dir"),
|
||||||
TestTimelineAuthenticationFilter.class.getName() + "-root");
|
TestTimelineAuthenticationFilterForV1.class.getName() + "-root");
|
||||||
private static File httpSpnegoKeytabFile = new File(
|
private static File httpSpnegoKeytabFile = new File(
|
||||||
KerberosTestUtils.getKeytabFile());
|
KerberosTestUtils.getKeytabFile());
|
||||||
private static String httpSpnegoPrincipal =
|
private static String httpSpnegoPrincipal =
|
||||||
KerberosTestUtils.getServerPrincipal();
|
KerberosTestUtils.getServerPrincipal();
|
||||||
private static final String BASEDIR =
|
private static final String BASEDIR =
|
||||||
System.getProperty("test.build.dir", "target/test-dir") + "/"
|
System.getProperty("test.build.dir", "target/test-dir") + "/"
|
||||||
+ TestTimelineAuthenticationFilter.class.getSimpleName();
|
+ TestTimelineAuthenticationFilterForV1.class.getSimpleName();
|
||||||
|
|
||||||
@Parameterized.Parameters
|
@Parameterized.Parameters
|
||||||
public static Collection<Object[]> withSsl() {
|
public static Collection<Object[]> withSsl() {
|
||||||
return Arrays.asList(new Object[][] { { false }, { true } });
|
return Arrays.asList(new Object[][] {{false}, {true}});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static MiniKdc testMiniKDC;
|
private static MiniKdc testMiniKDC;
|
||||||
|
@ -85,14 +89,14 @@ public class TestTimelineAuthenticationFilter {
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
private static boolean withSsl;
|
private static boolean withSsl;
|
||||||
|
|
||||||
public TestTimelineAuthenticationFilter(boolean withSsl) {
|
public TestTimelineAuthenticationFilterForV1(boolean withSsl) {
|
||||||
TestTimelineAuthenticationFilter.withSsl = withSsl;
|
TestTimelineAuthenticationFilterForV1.withSsl = withSsl;
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setup() {
|
public static void setup() {
|
||||||
try {
|
try {
|
||||||
testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir);
|
testMiniKDC = new MiniKdc(MiniKdc.createConf(), TEST_ROOT_DIR);
|
||||||
testMiniKDC.start();
|
testMiniKDC.start();
|
||||||
testMiniKDC.createPrincipal(
|
testMiniKDC.createPrincipal(
|
||||||
httpSpnegoKeytabFile, HTTP_USER + "/localhost");
|
httpSpnegoKeytabFile, HTTP_USER + "/localhost");
|
||||||
|
@ -111,11 +115,11 @@ public class TestTimelineAuthenticationFilter {
|
||||||
KerberosAuthenticationHandler.KEYTAB,
|
KerberosAuthenticationHandler.KEYTAB,
|
||||||
httpSpnegoKeytabFile.getAbsolutePath());
|
httpSpnegoKeytabFile.getAbsolutePath());
|
||||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
"kerberos");
|
"kerberos");
|
||||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL,
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL,
|
||||||
httpSpnegoPrincipal);
|
httpSpnegoPrincipal);
|
||||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
|
||||||
httpSpnegoKeytabFile.getAbsolutePath());
|
httpSpnegoKeytabFile.getAbsolutePath());
|
||||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
|
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
|
||||||
MemoryTimelineStore.class, TimelineStore.class);
|
MemoryTimelineStore.class, TimelineStore.class);
|
||||||
|
@ -136,8 +140,8 @@ public class TestTimelineAuthenticationFilter {
|
||||||
FileUtil.fullyDelete(base);
|
FileUtil.fullyDelete(base);
|
||||||
base.mkdirs();
|
base.mkdirs();
|
||||||
keystoresDir = new File(BASEDIR).getAbsolutePath();
|
keystoresDir = new File(BASEDIR).getAbsolutePath();
|
||||||
sslConfDir =
|
sslConfDir = KeyStoreTestUtil.getClasspathDir(
|
||||||
KeyStoreTestUtil.getClasspathDir(TestTimelineAuthenticationFilter.class);
|
TestTimelineAuthenticationFilterForV1.class);
|
||||||
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
|
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,6 +149,7 @@ public class TestTimelineAuthenticationFilter {
|
||||||
testTimelineServer.init(conf);
|
testTimelineServer.init(conf);
|
||||||
testTimelineServer.start();
|
testTimelineServer.start();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
assertTrue("Couldn't setup TimelineServer", false);
|
assertTrue("Couldn't setup TimelineServer", false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -181,14 +186,14 @@ public class TestTimelineAuthenticationFilter {
|
||||||
TimelineClient client = createTimelineClientForUGI();
|
TimelineClient client = createTimelineClientForUGI();
|
||||||
TimelineEntity entityToStore = new TimelineEntity();
|
TimelineEntity entityToStore = new TimelineEntity();
|
||||||
entityToStore.setEntityType(
|
entityToStore.setEntityType(
|
||||||
TestTimelineAuthenticationFilter.class.getName());
|
TestTimelineAuthenticationFilterForV1.class.getName());
|
||||||
entityToStore.setEntityId("entity1");
|
entityToStore.setEntityId("entity1");
|
||||||
entityToStore.setStartTime(0L);
|
entityToStore.setStartTime(0L);
|
||||||
TimelinePutResponse putResponse = client.putEntities(entityToStore);
|
TimelinePutResponse putResponse = client.putEntities(entityToStore);
|
||||||
Assert.assertEquals(0, putResponse.getErrors().size());
|
Assert.assertEquals(0, putResponse.getErrors().size());
|
||||||
TimelineEntity entityToRead =
|
TimelineEntity entityToRead =
|
||||||
testTimelineServer.getTimelineStore().getEntity(
|
testTimelineServer.getTimelineStore().getEntity("entity1",
|
||||||
"entity1", TestTimelineAuthenticationFilter.class.getName(), null);
|
TestTimelineAuthenticationFilterForV1.class.getName(), null);
|
||||||
Assert.assertNotNull(entityToRead);
|
Assert.assertNotNull(entityToRead);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -202,13 +207,14 @@ public class TestTimelineAuthenticationFilter {
|
||||||
public Void call() throws Exception {
|
public Void call() throws Exception {
|
||||||
TimelineClient client = createTimelineClientForUGI();
|
TimelineClient client = createTimelineClientForUGI();
|
||||||
TimelineDomain domainToStore = new TimelineDomain();
|
TimelineDomain domainToStore = new TimelineDomain();
|
||||||
domainToStore.setId(TestTimelineAuthenticationFilter.class.getName());
|
domainToStore.setId(
|
||||||
|
TestTimelineAuthenticationFilterForV1.class.getName());
|
||||||
domainToStore.setReaders("*");
|
domainToStore.setReaders("*");
|
||||||
domainToStore.setWriters("*");
|
domainToStore.setWriters("*");
|
||||||
client.putDomain(domainToStore);
|
client.putDomain(domainToStore);
|
||||||
TimelineDomain domainToRead =
|
TimelineDomain domainToRead =
|
||||||
testTimelineServer.getTimelineStore().getDomain(
|
testTimelineServer.getTimelineStore().getDomain(
|
||||||
TestTimelineAuthenticationFilter.class.getName());
|
TestTimelineAuthenticationFilterForV1.class.getName());
|
||||||
Assert.assertNotNull(domainToRead);
|
Assert.assertNotNull(domainToRead);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -218,22 +224,24 @@ public class TestTimelineAuthenticationFilter {
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokenOperations() throws Exception {
|
public void testDelegationTokenOperations() throws Exception {
|
||||||
TimelineClient httpUserClient =
|
TimelineClient httpUserClient =
|
||||||
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<TimelineClient>() {
|
KerberosTestUtils.doAs(HTTP_USER + "/localhost",
|
||||||
@Override
|
new Callable<TimelineClient>() {
|
||||||
public TimelineClient call() throws Exception {
|
@Override
|
||||||
return createTimelineClientForUGI();
|
public TimelineClient call() throws Exception {
|
||||||
}
|
return createTimelineClientForUGI();
|
||||||
});
|
}
|
||||||
|
});
|
||||||
UserGroupInformation httpUser =
|
UserGroupInformation httpUser =
|
||||||
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<UserGroupInformation>() {
|
KerberosTestUtils.doAs(HTTP_USER + "/localhost",
|
||||||
@Override
|
new Callable<UserGroupInformation>() {
|
||||||
public UserGroupInformation call() throws Exception {
|
@Override
|
||||||
return UserGroupInformation.getCurrentUser();
|
public UserGroupInformation call() throws Exception {
|
||||||
}
|
return UserGroupInformation.getCurrentUser();
|
||||||
});
|
}
|
||||||
|
});
|
||||||
// Let HTTP user to get the delegation for itself
|
// Let HTTP user to get the delegation for itself
|
||||||
Token<TimelineDelegationTokenIdentifier> token =
|
Token<TimelineDelegationTokenIdentifier> token =
|
||||||
httpUserClient.getDelegationToken(httpUser.getShortUserName());
|
httpUserClient.getDelegationToken(httpUser.getShortUserName());
|
||||||
Assert.assertNotNull(token);
|
Assert.assertNotNull(token);
|
||||||
TimelineDelegationTokenIdentifier tDT = token.decodeIdentifier();
|
TimelineDelegationTokenIdentifier tDT = token.decodeIdentifier();
|
||||||
Assert.assertNotNull(tDT);
|
Assert.assertNotNull(tDT);
|
||||||
|
@ -317,7 +325,8 @@ public class TestTimelineAuthenticationFilter {
|
||||||
barUserClient.getDelegationToken(httpUser.getShortUserName());
|
barUserClient.getDelegationToken(httpUser.getShortUserName());
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.assertTrue(e.getCause() instanceof AuthorizationException || e.getCause() instanceof AuthenticationException);
|
Assert.assertTrue(e.getCause() instanceof AuthorizationException ||
|
||||||
|
e.getCause() instanceof AuthenticationException);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -23,27 +23,33 @@ import javax.servlet.ServletException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter;
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService.TimelineDelegationTokenSecretManager;
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeline authentication filter provides delegation token support for ATSv1
|
||||||
|
* and ATSv2.
|
||||||
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class TimelineAuthenticationFilter
|
public class TimelineAuthenticationFilter
|
||||||
extends DelegationTokenAuthenticationFilter {
|
extends DelegationTokenAuthenticationFilter {
|
||||||
|
|
||||||
private static TimelineDelegationTokenSecretManager secretManager;
|
private static AbstractDelegationTokenSecretManager
|
||||||
|
<TimelineDelegationTokenIdentifier> secretManager;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(FilterConfig filterConfig) throws ServletException {
|
public void init(FilterConfig filterConfig) throws ServletException {
|
||||||
filterConfig.getServletContext().setAttribute(
|
filterConfig.getServletContext().setAttribute(
|
||||||
DelegationTokenAuthenticationFilter.DELEGATION_TOKEN_SECRET_MANAGER_ATTR,
|
DelegationTokenAuthenticationFilter.
|
||||||
secretManager);
|
DELEGATION_TOKEN_SECRET_MANAGER_ATTR, secretManager);
|
||||||
super.init(filterConfig);
|
super.init(filterConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void setTimelineDelegationTokenSecretManager(
|
public static void setTimelineDelegationTokenSecretManager(
|
||||||
TimelineDelegationTokenSecretManager secretManager) {
|
AbstractDelegationTokenSecretManager
|
||||||
TimelineAuthenticationFilter.secretManager = secretManager;
|
<TimelineDelegationTokenIdentifier> secretMgr) {
|
||||||
|
TimelineAuthenticationFilter.secretManager = secretMgr;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timeline.security;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract implementation of delegation token manager service for different
|
||||||
|
* versions of timeline service.
|
||||||
|
*/
|
||||||
|
public abstract class TimelineDelgationTokenSecretManagerService extends
|
||||||
|
AbstractService {
|
||||||
|
|
||||||
|
public TimelineDelgationTokenSecretManagerService(String name) {
|
||||||
|
super(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long delegationTokenRemovalScanInterval = 3600000L;
|
||||||
|
|
||||||
|
private AbstractDelegationTokenSecretManager
|
||||||
|
<TimelineDelegationTokenIdentifier> secretManager = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
long secretKeyInterval =
|
||||||
|
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_KEY_UPDATE_INTERVAL,
|
||||||
|
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_KEY_UPDATE_INTERVAL);
|
||||||
|
long tokenMaxLifetime =
|
||||||
|
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME,
|
||||||
|
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME);
|
||||||
|
long tokenRenewInterval =
|
||||||
|
conf.getLong(YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL,
|
||||||
|
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL);
|
||||||
|
secretManager = createTimelineDelegationTokenSecretManager(
|
||||||
|
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval,
|
||||||
|
delegationTokenRemovalScanInterval);
|
||||||
|
super.init(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract
|
||||||
|
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier>
|
||||||
|
createTimelineDelegationTokenSecretManager(long secretKeyInterval,
|
||||||
|
long tokenMaxLifetime, long tokenRenewInterval,
|
||||||
|
long tokenRemovalScanInterval);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
secretManager.startThreads();
|
||||||
|
super.serviceStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
secretManager.stopThreads();
|
||||||
|
super.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbstractDelegationTokenSecretManager
|
||||||
|
<TimelineDelegationTokenIdentifier>
|
||||||
|
getTimelineDelegationTokenSecretManager() {
|
||||||
|
return secretManager;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Package org.apache.hadoop.server.timeline.security contains classes related
|
||||||
|
* to timeline authentication filters and abstract delegation token service for
|
||||||
|
* ATSv1 and ATSv2.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
package org.apache.hadoop.yarn.server.timeline.security;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@ -0,0 +1,92 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.util.timeline;
|
||||||
|
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.AuthenticationFilterInitializer;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set of utility methods to be used across timeline reader and collector.
|
||||||
|
*/
|
||||||
|
public final class TimelineServerUtils {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TimelineServerUtils.class);
|
||||||
|
|
||||||
|
private TimelineServerUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets filter initializers configuration based on existing configuration and
|
||||||
|
* default filters added by timeline service(such as timeline auth filter and
|
||||||
|
* CORS filter).
|
||||||
|
* @param conf Configuration object.
|
||||||
|
* @param configuredInitializers Comma separated list of filter initializers.
|
||||||
|
* @param defaultInitializers Set of initializers added by default by timeline
|
||||||
|
* service.
|
||||||
|
*/
|
||||||
|
public static void setTimelineFilters(Configuration conf,
|
||||||
|
String configuredInitializers, Set<String> defaultInitializers) {
|
||||||
|
String[] parts = configuredInitializers.split(",");
|
||||||
|
Set<String> target = new LinkedHashSet<String>();
|
||||||
|
for (String filterInitializer : parts) {
|
||||||
|
filterInitializer = filterInitializer.trim();
|
||||||
|
if (filterInitializer.equals(
|
||||||
|
AuthenticationFilterInitializer.class.getName()) ||
|
||||||
|
filterInitializer.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
target.add(filterInitializer);
|
||||||
|
}
|
||||||
|
target.addAll(defaultInitializers);
|
||||||
|
String actualInitializers =
|
||||||
|
org.apache.commons.lang.StringUtils.join(target, ",");
|
||||||
|
LOG.info("Filter initializers set for timeline service: " +
|
||||||
|
actualInitializers);
|
||||||
|
conf.set("hadoop.http.filter.initializers", actualInitializers);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds timeline authentication filter to the set of default filter
|
||||||
|
* initializers and assigns the delegation token manager service to it.
|
||||||
|
* @param initializers Comma separated list of filter initializers.
|
||||||
|
* @param defaultInitializers Set of initializers added by default by timeline
|
||||||
|
* service.
|
||||||
|
* @param delegationTokenMgrService Delegation token manager service.
|
||||||
|
* This will be used by timeline authentication filter to assign
|
||||||
|
* delegation tokens.
|
||||||
|
*/
|
||||||
|
public static void addTimelineAuthFilter(String initializers,
|
||||||
|
Set<String> defaultInitializers,
|
||||||
|
TimelineDelgationTokenSecretManagerService delegationTokenMgrService) {
|
||||||
|
TimelineAuthenticationFilter.setTimelineDelegationTokenSecretManager(
|
||||||
|
delegationTokenMgrService.getTimelineDelegationTokenSecretManager());
|
||||||
|
if (!initializers.contains(
|
||||||
|
TimelineAuthenticationFilterInitializer.class.getName())) {
|
||||||
|
defaultInitializers.add(
|
||||||
|
TimelineAuthenticationFilterInitializer.class.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Package org.apache.hadoop.server.util.timeline contains utility classes used
|
||||||
|
* by ATSv1 and ATSv2 on the server side.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
package org.apache.hadoop.yarn.server.util.timeline;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@ -18,13 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -32,7 +30,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.http.HttpServer2;
|
import org.apache.hadoop.http.HttpServer2;
|
||||||
import org.apache.hadoop.http.lib.StaticUserWebFilter;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.security.TimelineV2DelegationTokenSecretManagerService;
|
||||||
|
import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
@ -65,17 +67,51 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
||||||
|
|
||||||
private volatile CollectorNodemanagerProtocol nmCollectorService;
|
private volatile CollectorNodemanagerProtocol nmCollectorService;
|
||||||
|
|
||||||
|
private TimelineV2DelegationTokenSecretManagerService tokenMgrService;
|
||||||
|
|
||||||
|
private final boolean runningAsAuxService;
|
||||||
|
|
||||||
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
|
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected NodeTimelineCollectorManager() {
|
protected NodeTimelineCollectorManager() {
|
||||||
|
this(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected NodeTimelineCollectorManager(boolean asAuxService) {
|
||||||
super(NodeTimelineCollectorManager.class.getName());
|
super(NodeTimelineCollectorManager.class.getName());
|
||||||
|
this.runningAsAuxService = asAuxService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
tokenMgrService = new TimelineV2DelegationTokenSecretManagerService();
|
||||||
|
addService(tokenMgrService);
|
||||||
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
startWebApp();
|
if (UserGroupInformation.isSecurityEnabled() && !runningAsAuxService) {
|
||||||
|
// Do security login for cases where collector is running outside NM.
|
||||||
|
try {
|
||||||
|
doSecureLogin();
|
||||||
|
} catch(IOException ie) {
|
||||||
|
throw new YarnRuntimeException("Failed to login", ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
|
startWebApp();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doSecureLogin() throws IOException {
|
||||||
|
Configuration conf = getConfig();
|
||||||
|
InetSocketAddress addr = NetUtils.createSocketAddr(conf.getTrimmed(
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
|
||||||
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST), 0,
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST);
|
||||||
|
SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, addr.getHostName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -105,6 +141,12 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
||||||
*/
|
*/
|
||||||
private void startWebApp() {
|
private void startWebApp() {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
|
String initializers = conf.get("hadoop.http.filter.initializers", "");
|
||||||
|
Set<String> defaultInitializers = new LinkedHashSet<String>();
|
||||||
|
TimelineServerUtils.addTimelineAuthFilter(
|
||||||
|
initializers, defaultInitializers, tokenMgrService);
|
||||||
|
TimelineServerUtils.setTimelineFilters(
|
||||||
|
conf, initializers, defaultInitializers);
|
||||||
String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
|
String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
|
||||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
|
||||||
try {
|
try {
|
||||||
|
@ -114,16 +156,10 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
||||||
.addEndpoint(URI.create(
|
.addEndpoint(URI.create(
|
||||||
(YarnConfiguration.useHttps(conf) ? "https://" : "http://") +
|
(YarnConfiguration.useHttps(conf) ? "https://" : "http://") +
|
||||||
bindAddress));
|
bindAddress));
|
||||||
|
if (YarnConfiguration.useHttps(conf)) {
|
||||||
|
builder = WebAppUtils.loadSslConfiguration(builder, conf);
|
||||||
|
}
|
||||||
timelineRestServer = builder.build();
|
timelineRestServer = builder.build();
|
||||||
// TODO: replace this by an authentication filter in future.
|
|
||||||
HashMap<String, String> options = new HashMap<>();
|
|
||||||
String username = conf.get(HADOOP_HTTP_STATIC_USER,
|
|
||||||
DEFAULT_HADOOP_HTTP_STATIC_USER);
|
|
||||||
options.put(HADOOP_HTTP_STATIC_USER, username);
|
|
||||||
HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
|
|
||||||
"static_user_filter_timeline",
|
|
||||||
StaticUserWebFilter.StaticUserFilter.class.getName(),
|
|
||||||
options, new String[] {"/*"});
|
|
||||||
|
|
||||||
timelineRestServer.addJerseyResourcePackage(
|
timelineRestServer.addJerseyResourcePackage(
|
||||||
TimelineCollectorWebService.class.getPackage().getName() + ";"
|
TimelineCollectorWebService.class.getPackage().getName() + ";"
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
||||||
private ScheduledExecutorService scheduler;
|
private ScheduledExecutorService scheduler;
|
||||||
|
|
||||||
public PerNodeTimelineCollectorsAuxService() {
|
public PerNodeTimelineCollectorsAuxService() {
|
||||||
this(new NodeTimelineCollectorManager());
|
this(new NodeTimelineCollectorManager(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting PerNodeTimelineCollectorsAuxService(
|
@VisibleForTesting PerNodeTimelineCollectorsAuxService(
|
||||||
|
@ -202,7 +202,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
||||||
PerNodeTimelineCollectorsAuxService auxService = null;
|
PerNodeTimelineCollectorsAuxService auxService = null;
|
||||||
try {
|
try {
|
||||||
auxService = collectorManager == null ?
|
auxService = collectorManager == null ?
|
||||||
new PerNodeTimelineCollectorsAuxService() :
|
new PerNodeTimelineCollectorsAuxService(
|
||||||
|
new NodeTimelineCollectorManager(false)) :
|
||||||
new PerNodeTimelineCollectorsAuxService(collectorManager);
|
new PerNodeTimelineCollectorsAuxService(collectorManager);
|
||||||
ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
|
ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
|
||||||
SHUTDOWN_HOOK_PRIORITY);
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -47,7 +47,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class TimelineCollectorManager extends AbstractService {
|
public class TimelineCollectorManager extends CompositeService {
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(TimelineCollectorManager.class);
|
LogFactory.getLog(TimelineCollectorManager.class);
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ public class TimelineCollectorManager extends AbstractService {
|
||||||
private boolean writerFlusherRunning;
|
private boolean writerFlusherRunning;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
writer = createTimelineWriter(conf);
|
writer = createTimelineWriter(conf);
|
||||||
writer.init(conf);
|
writer.init(conf);
|
||||||
// create a single dedicated thread for flushing the writer on a periodic
|
// create a single dedicated thread for flushing the writer on a periodic
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.security;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The service wrapper of {@link TimelineV2DelegationTokenSecretManager}.
|
||||||
|
*/
|
||||||
|
public class TimelineV2DelegationTokenSecretManagerService extends
|
||||||
|
TimelineDelgationTokenSecretManagerService {
|
||||||
|
public TimelineV2DelegationTokenSecretManagerService() {
|
||||||
|
super(TimelineV2DelegationTokenSecretManagerService.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AbstractDelegationTokenSecretManager
|
||||||
|
<TimelineDelegationTokenIdentifier>
|
||||||
|
createTimelineDelegationTokenSecretManager(long secretKeyInterval,
|
||||||
|
long tokenMaxLifetime, long tokenRenewInterval,
|
||||||
|
long tokenRemovalScanInterval) {
|
||||||
|
return new TimelineV2DelegationTokenSecretManager(secretKeyInterval,
|
||||||
|
tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delegation token secret manager for ATSv2.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public static class TimelineV2DelegationTokenSecretManager extends
|
||||||
|
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a timeline v2 secret manager.
|
||||||
|
* @param delegationKeyUpdateInterval the number of milliseconds for rolling
|
||||||
|
* new secret keys.
|
||||||
|
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation
|
||||||
|
* tokens in milliseconds
|
||||||
|
* @param delegationTokenRenewInterval how often the tokens must be renewed
|
||||||
|
* in milliseconds
|
||||||
|
* @param delegationTokenRemoverScanInterval how often the tokens are
|
||||||
|
* scanned for expired tokens in milliseconds
|
||||||
|
*/
|
||||||
|
public TimelineV2DelegationTokenSecretManager(
|
||||||
|
long delegationKeyUpdateInterval, long delegationTokenMaxLifetime,
|
||||||
|
long delegationTokenRenewInterval,
|
||||||
|
long delegationTokenRemoverScanInterval) {
|
||||||
|
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
|
||||||
|
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineDelegationTokenIdentifier createIdentifier() {
|
||||||
|
return new TimelineDelegationTokenIdentifier();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Package org.apache.hadoop.server.timelineservice.security contains classes
|
||||||
|
* to be used to generate delegation tokens for ATSv2.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.security;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
Loading…
Reference in New Issue