YARN-6133. [ATSv2 Security] Renew delegation token for app automatically if an app collector is active. Contributed by Varun Saxena.

This commit is contained in:
Rohith Sharma K S 2017-08-10 11:12:57 +05:30 committed by Varun Saxena
parent 7594d1de7b
commit 354be99dbf
6 changed files with 139 additions and 14 deletions

View File

@ -24,6 +24,7 @@
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -183,6 +184,13 @@ public void initialize() throws Exception {
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTP_ONLY.name()); HttpConfig.Policy.HTTP_ONLY.name());
} }
if (!withKerberosLogin) {
// For timeline delegation token based access, set delegation token renew
// interval to 100 ms. to test if timeline delegation token for the app is
// renewed automatically if app is still alive.
conf.setLong(
YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL, 100);
}
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
collectorManager = new DummyNodeTimelineCollectorManager(); collectorManager = new DummyNodeTimelineCollectorManager();
auxService = PerNodeTimelineCollectorsAuxService.launchServer( auxService = PerNodeTimelineCollectorsAuxService.launchServer(
@ -282,12 +290,12 @@ private static TimelineEntity readEntityFile(File entityFile)
} }
private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir, private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir,
String entityType) throws Exception { String entityType, int numEntities) throws Exception {
TimelineV2Client client = createTimelineClientForUGI(appId); TimelineV2Client client = createTimelineClientForUGI(appId);
try { try {
// Sync call. Results available immediately. // Sync call. Results available immediately.
client.putEntities(createEntity("entity1", entityType)); client.putEntities(createEntity("entity1", entityType));
assertEquals(1, entityTypeDir.listFiles().length); assertEquals(numEntities, entityTypeDir.listFiles().length);
verifyEntity(entityTypeDir, "entity1", entityType); verifyEntity(entityTypeDir, "entity1", entityType);
// Async call. // Async call.
client.putEntitiesAsync(createEntity("entity2", entityType)); client.putEntitiesAsync(createEntity("entity2", entityType));
@ -312,12 +320,22 @@ public void testPutTimelineEntities() throws Exception {
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() { KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
publishAndVerifyEntity(appId, entityTypeDir, entityType); publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
return null; return null;
} }
}); });
} else { } else {
publishAndVerifyEntity(appId, entityTypeDir, entityType); publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
// Verify if token is renewed automatically and entities can still be
// published.
Thread.sleep(1000);
publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
AppLevelTimelineCollector collector =
(AppLevelTimelineCollector) collectorManager.get(appId);
assertNotNull(collector);
verify(collectorManager.getTokenManagerService(), atLeastOnce()).
renewToken(eq(collector.getDelegationTokenForApp()),
any(String.class));
} }
// Wait for async entity to be published. // Wait for async entity to be published.
for (int i = 0; i < 50; i++) { for (int i = 0; i < 50; i++) {
@ -330,6 +348,7 @@ public Void call() throws Exception {
verifyEntity(entityTypeDir, "entity2", entityType); verifyEntity(entityTypeDir, "entity2", entityType);
AppLevelTimelineCollector collector = AppLevelTimelineCollector collector =
(AppLevelTimelineCollector)collectorManager.get(appId); (AppLevelTimelineCollector)collectorManager.get(appId);
assertNotNull(collector);
auxService.removeApplication(appId); auxService.removeApplication(appId);
verify(collectorManager.getTokenManagerService()).cancelToken( verify(collectorManager.getTokenManagerService()).cancelToken(
eq(collector.getDelegationTokenForApp()), any(String.class)); eq(collector.getDelegationTokenForApp()), any(String.class));

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.timelineservice.collector; package org.apache.hadoop.yarn.server.timelineservice.collector;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -49,6 +51,7 @@ public class AppLevelTimelineCollector extends TimelineCollector {
private final TimelineCollectorContext context; private final TimelineCollectorContext context;
private UserGroupInformation currentUser; private UserGroupInformation currentUser;
private Token<TimelineDelegationTokenIdentifier> delegationTokenForApp; private Token<TimelineDelegationTokenIdentifier> delegationTokenForApp;
private Future<?> renewalFuture;
public AppLevelTimelineCollector(ApplicationId appId) { public AppLevelTimelineCollector(ApplicationId appId) {
this(appId, null); this(appId, null);
@ -70,9 +73,15 @@ public String getAppUser() {
return appUser; return appUser;
} }
void setDelegationTokenForApp( void setDelegationTokenAndFutureForApp(
Token<TimelineDelegationTokenIdentifier> token) { Token<TimelineDelegationTokenIdentifier> token,
Future<?> appRenewalFuture) {
this.delegationTokenForApp = token; this.delegationTokenForApp = token;
this.renewalFuture = appRenewalFuture;
}
void setRenewalFutureForApp(Future<?> appRenewalFuture) {
this.renewalFuture = appRenewalFuture;
} }
@VisibleForTesting @VisibleForTesting
@ -100,6 +109,9 @@ protected void serviceStart() throws Exception {
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
if (renewalFuture != null && !renewalFuture.isDone()) {
renewalFuture.cancel(true);
}
super.serviceStop(); super.serviceStop();
} }
@ -107,5 +119,4 @@ protected void serviceStop() throws Exception {
public TimelineCollectorContext getTimelineEntityContext() { public TimelineCollectorContext getTimelineEntityContext() {
return context; return context;
} }
} }

View File

@ -23,6 +23,9 @@
import java.net.URI; import java.net.URI;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -32,6 +35,7 @@
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -50,6 +54,7 @@
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,6 +81,12 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
private UserGroupInformation loginUGI; private UserGroupInformation loginUGI;
private ScheduledThreadPoolExecutor tokenRenewalExecutor;
private long tokenRenewInterval;
private static final long TIME_BEFORE_RENEW_DATE = 10 * 1000; // 10 seconds.
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
@VisibleForTesting @VisibleForTesting
@ -93,6 +104,9 @@ protected void serviceInit(Configuration conf) throws Exception {
tokenMgrService = createTokenManagerService(); tokenMgrService = createTokenManagerService();
addService(tokenMgrService); addService(tokenMgrService);
this.loginUGI = UserGroupInformation.getCurrentUser(); this.loginUGI = UserGroupInformation.getCurrentUser();
tokenRenewInterval = conf.getLong(
YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL,
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -109,6 +123,9 @@ protected void serviceStart() throws Exception {
} }
this.loginUGI = UserGroupInformation.getLoginUser(); this.loginUGI = UserGroupInformation.getLoginUser();
} }
tokenRenewalExecutor = new ScheduledThreadPoolExecutor(
1, new ThreadFactoryBuilder().setNameFormat(
"App Collector Token Renewal thread").build());
super.serviceStart(); super.serviceStart();
startWebApp(); startWebApp();
} }
@ -139,6 +156,9 @@ protected void serviceStop() throws Exception {
if (timelineRestServer != null) { if (timelineRestServer != null) {
timelineRestServer.stop(); timelineRestServer.stop();
} }
if (tokenRenewalExecutor != null) {
tokenRenewalExecutor.shutdownNow();
}
super.serviceStop(); super.serviceStop();
} }
@ -152,6 +172,21 @@ public Token<TimelineDelegationTokenIdentifier> generateTokenForAppCollector(
return token; return token;
} }
@VisibleForTesting
public long renewTokenForAppCollector(
AppLevelTimelineCollector appCollector) throws IOException {
if (appCollector.getDelegationTokenForApp() != null) {
TimelineDelegationTokenIdentifier identifier =
appCollector.getDelegationTokenForApp().decodeIdentifier();
return tokenMgrService.renewToken(appCollector.getDelegationTokenForApp(),
identifier.getRenewer().toString());
} else {
LOG.info("Delegation token not available for renewal for app " +
appCollector.getTimelineEntityContext().getAppId());
return -1;
}
}
@VisibleForTesting @VisibleForTesting
public void cancelTokenForAppCollector( public void cancelTokenForAppCollector(
AppLevelTimelineCollector appCollector) throws IOException { AppLevelTimelineCollector appCollector) throws IOException {
@ -174,13 +209,19 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
(AppLevelTimelineCollector)collector; (AppLevelTimelineCollector)collector;
Token<TimelineDelegationTokenIdentifier> timelineToken = Token<TimelineDelegationTokenIdentifier> timelineToken =
generateTokenForAppCollector(appCollector.getAppUser()); generateTokenForAppCollector(appCollector.getAppUser());
appCollector.setDelegationTokenForApp(timelineToken); long renewalDelay = (tokenRenewInterval > TIME_BEFORE_RENEW_DATE) ?
tokenRenewInterval - TIME_BEFORE_RENEW_DATE : tokenRenewInterval;
Future<?> renewalFuture =
tokenRenewalExecutor.schedule(new CollectorTokenRenewer(appId),
renewalDelay, TimeUnit.MILLISECONDS);
appCollector.setDelegationTokenAndFutureForApp(timelineToken,
renewalFuture);
token = org.apache.hadoop.yarn.api.records.Token.newInstance( token = org.apache.hadoop.yarn.api.records.Token.newInstance(
timelineToken.getIdentifier(), timelineToken.getKind().toString(), timelineToken.getIdentifier(), timelineToken.getKind().toString(),
timelineToken.getPassword(), timelineToken.getService().toString()); timelineToken.getPassword(), timelineToken.getService().toString());
} }
// Report to NM if a new collector is added. // Report to NM if a new collector is added.
reportNewCollectorToNM(appId, token); reportNewCollectorInfoToNM(appId, token);
} catch (YarnException | IOException e) { } catch (YarnException | IOException e) {
// throw exception here as it cannot be used if failed communicate with NM // throw exception here as it cannot be used if failed communicate with NM
LOG.error("Failed to communicate with NM Collector Service for " + appId); LOG.error("Failed to communicate with NM Collector Service for " + appId);
@ -192,7 +233,7 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
protected void postRemove(ApplicationId appId, TimelineCollector collector) { protected void postRemove(ApplicationId appId, TimelineCollector collector) {
if (collector instanceof AppLevelTimelineCollector) { if (collector instanceof AppLevelTimelineCollector) {
try { try {
cancelTokenForAppCollector((AppLevelTimelineCollector)collector); cancelTokenForAppCollector((AppLevelTimelineCollector) collector);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to cancel token for app collector with appId " + LOG.warn("Failed to cancel token for app collector with appId " +
appId, e); appId, e);
@ -244,7 +285,7 @@ private void startWebApp() {
timelineRestServerBindAddress); timelineRestServerBindAddress);
} }
private void reportNewCollectorToNM(ApplicationId appId, private void reportNewCollectorInfoToNM(ApplicationId appId,
org.apache.hadoop.yarn.api.records.Token token) org.apache.hadoop.yarn.api.records.Token token)
throws YarnException, IOException { throws YarnException, IOException {
ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest request =
@ -321,4 +362,43 @@ protected CollectorNodemanagerProtocol getNMCollectorService() {
public String getRestServerBindAddress() { public String getRestServerBindAddress() {
return timelineRestServerBindAddress; return timelineRestServerBindAddress;
} }
private final class CollectorTokenRenewer implements Runnable {
private ApplicationId appId;
private CollectorTokenRenewer(ApplicationId applicationId) {
appId = applicationId;
}
@Override
public void run() {
TimelineCollector collector = get(appId);
if (collector == null) {
LOG.info("Cannot find active collector while renewing token for " +
appId);
return;
}
AppLevelTimelineCollector appCollector =
(AppLevelTimelineCollector) collector;
synchronized (collector) {
if (!collector.isStopped()) {
try {
long newExpirationTime = renewTokenForAppCollector(appCollector);
if (newExpirationTime > 0) {
long renewInterval = newExpirationTime - Time.now();
long renewalDelay = (renewInterval > TIME_BEFORE_RENEW_DATE) ?
renewInterval - TIME_BEFORE_RENEW_DATE : renewInterval;
LOG.info("Renewed token for " + appId + " with new expiration " +
"timestamp = " + newExpirationTime);
Future<?> renewalFuture = tokenRenewalExecutor.schedule(
this, renewalDelay, TimeUnit.MILLISECONDS);
appCollector.setRenewalFutureForApp(renewalFuture);
}
} catch (Exception e) {
LOG.warn("Unable to renew token for " + appId);
}
}
}
}
}
} }

View File

@ -63,6 +63,8 @@ public abstract class TimelineCollector extends CompositeService {
private volatile boolean readyToAggregate = false; private volatile boolean readyToAggregate = false;
private volatile boolean isStopped = false;
public TimelineCollector(String name) { public TimelineCollector(String name) {
super(name); super(name);
} }
@ -79,9 +81,14 @@ protected void serviceStart() throws Exception {
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
isStopped = true;
super.serviceStop(); super.serviceStop();
} }
boolean isStopped() {
return isStopped;
}
protected void setWriter(TimelineWriter w) { protected void setWriter(TimelineWriter w) {
this.writer = w; this.writer = w;
} }

View File

@ -184,9 +184,11 @@ public boolean remove(ApplicationId appId) {
if (collector == null) { if (collector == null) {
LOG.error("the collector for " + appId + " does not exist!"); LOG.error("the collector for " + appId + " does not exist!");
} else { } else {
postRemove(appId, collector); synchronized (collector) {
// stop the service to do clean up postRemove(appId, collector);
collector.stop(); // stop the service to do clean up
collector.stop();
}
LOG.info("The collector service for " + appId + " was removed"); LOG.info("The collector service for " + appId + " was removed");
} }
return collector != null; return collector != null;

View File

@ -34,6 +34,7 @@
*/ */
public class TimelineV2DelegationTokenSecretManagerService extends public class TimelineV2DelegationTokenSecretManagerService extends
TimelineDelgationTokenSecretManagerService { TimelineDelgationTokenSecretManagerService {
public TimelineV2DelegationTokenSecretManagerService() { public TimelineV2DelegationTokenSecretManagerService() {
super(TimelineV2DelegationTokenSecretManagerService.class.getName()); super(TimelineV2DelegationTokenSecretManagerService.class.getName());
} }
@ -54,6 +55,11 @@ public Token<TimelineDelegationTokenIdentifier> generateToken(
getTimelineDelegationTokenSecretManager()).generateToken(ugi, renewer); getTimelineDelegationTokenSecretManager()).generateToken(ugi, renewer);
} }
public long renewToken(Token<TimelineDelegationTokenIdentifier> token,
String renewer) throws IOException {
return getTimelineDelegationTokenSecretManager().renewToken(token, renewer);
}
public void cancelToken(Token<TimelineDelegationTokenIdentifier> token, public void cancelToken(Token<TimelineDelegationTokenIdentifier> token,
String canceller) throws IOException { String canceller) throws IOException {
getTimelineDelegationTokenSecretManager().cancelToken(token, canceller); getTimelineDelegationTokenSecretManager().cancelToken(token, canceller);