YARN-6133. [ATSv2 Security] Renew delegation token for app automatically if an app collector is active. Contributed by Varun Saxena.

(cherry picked from commit ffb1f572b68e73efd6410ab74a334f4e5df543f2)
This commit is contained in:
Rohith Sharma K S 2017-08-10 11:12:57 +05:30 committed by Varun Saxena
parent 5a7c272e0a
commit cb75e1826e
6 changed files with 139 additions and 14 deletions

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@ -183,6 +184,13 @@ public class TestTimelineAuthFilterForV2 {
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTP_ONLY.name());
}
if (!withKerberosLogin) {
// For timeline delegation token based access, set delegation token renew
// interval to 100 ms. to test if timeline delegation token for the app is
// renewed automatically if app is still alive.
conf.setLong(
YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL, 100);
}
UserGroupInformation.setConfiguration(conf);
collectorManager = new DummyNodeTimelineCollectorManager();
auxService = PerNodeTimelineCollectorsAuxService.launchServer(
@ -282,12 +290,12 @@ public class TestTimelineAuthFilterForV2 {
}
private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir,
String entityType) throws Exception {
String entityType, int numEntities) throws Exception {
TimelineV2Client client = createTimelineClientForUGI(appId);
try {
// Sync call. Results available immediately.
client.putEntities(createEntity("entity1", entityType));
assertEquals(1, entityTypeDir.listFiles().length);
assertEquals(numEntities, entityTypeDir.listFiles().length);
verifyEntity(entityTypeDir, "entity1", entityType);
// Async call.
client.putEntitiesAsync(createEntity("entity2", entityType));
@ -312,12 +320,22 @@ public class TestTimelineAuthFilterForV2 {
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
@Override
public Void call() throws Exception {
publishAndVerifyEntity(appId, entityTypeDir, entityType);
publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
return null;
}
});
} else {
publishAndVerifyEntity(appId, entityTypeDir, entityType);
publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
// Verify if token is renewed automatically and entities can still be
// published.
Thread.sleep(1000);
publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
AppLevelTimelineCollector collector =
(AppLevelTimelineCollector) collectorManager.get(appId);
assertNotNull(collector);
verify(collectorManager.getTokenManagerService(), atLeastOnce()).
renewToken(eq(collector.getDelegationTokenForApp()),
any(String.class));
}
// Wait for async entity to be published.
for (int i = 0; i < 50; i++) {
@ -330,6 +348,7 @@ public class TestTimelineAuthFilterForV2 {
verifyEntity(entityTypeDir, "entity2", entityType);
AppLevelTimelineCollector collector =
(AppLevelTimelineCollector)collectorManager.get(appId);
assertNotNull(collector);
auxService.removeApplication(appId);
verify(collectorManager.getTokenManagerService()).cancelToken(
eq(collector.getDelegationTokenForApp()), any(String.class));

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -48,6 +50,7 @@ public class AppLevelTimelineCollector extends TimelineCollector {
private final TimelineCollectorContext context;
private UserGroupInformation currentUser;
private Token<TimelineDelegationTokenIdentifier> delegationTokenForApp;
private Future<?> renewalFuture;
public AppLevelTimelineCollector(ApplicationId appId) {
this(appId, null);
@ -69,9 +72,15 @@ public class AppLevelTimelineCollector extends TimelineCollector {
return appUser;
}
void setDelegationTokenForApp(
Token<TimelineDelegationTokenIdentifier> token) {
void setDelegationTokenAndFutureForApp(
Token<TimelineDelegationTokenIdentifier> token,
Future<?> appRenewalFuture) {
this.delegationTokenForApp = token;
this.renewalFuture = appRenewalFuture;
}
void setRenewalFutureForApp(Future<?> appRenewalFuture) {
this.renewalFuture = appRenewalFuture;
}
@VisibleForTesting
@ -99,6 +108,9 @@ public class AppLevelTimelineCollector extends TimelineCollector {
@Override
protected void serviceStop() throws Exception {
if (renewalFuture != null && !renewalFuture.isDone()) {
renewalFuture.cancel(true);
}
super.serviceStop();
}
@ -106,5 +118,4 @@ public class AppLevelTimelineCollector extends TimelineCollector {
public TimelineCollectorContext getTimelineEntityContext() {
return context;
}
}

View File

@ -23,6 +23,9 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,6 +37,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -52,6 +56,7 @@ import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Class on the NodeManager side that manages adding and removing collectors and
@ -76,6 +81,12 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
private UserGroupInformation loginUGI;
private ScheduledThreadPoolExecutor tokenRenewalExecutor;
private long tokenRenewInterval;
private static final long TIME_BEFORE_RENEW_DATE = 10 * 1000; // 10 seconds.
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
@VisibleForTesting
@ -93,6 +104,9 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
tokenMgrService = createTokenManagerService();
addService(tokenMgrService);
this.loginUGI = UserGroupInformation.getCurrentUser();
tokenRenewInterval = conf.getLong(
YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL,
YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL);
super.serviceInit(conf);
}
@ -109,6 +123,9 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
}
this.loginUGI = UserGroupInformation.getLoginUser();
}
tokenRenewalExecutor = new ScheduledThreadPoolExecutor(
1, new ThreadFactoryBuilder().setNameFormat(
"App Collector Token Renewal thread").build());
super.serviceStart();
startWebApp();
}
@ -139,6 +156,9 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
if (timelineRestServer != null) {
timelineRestServer.stop();
}
if (tokenRenewalExecutor != null) {
tokenRenewalExecutor.shutdownNow();
}
super.serviceStop();
}
@ -152,6 +172,21 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
return token;
}
@VisibleForTesting
public long renewTokenForAppCollector(
AppLevelTimelineCollector appCollector) throws IOException {
if (appCollector.getDelegationTokenForApp() != null) {
TimelineDelegationTokenIdentifier identifier =
appCollector.getDelegationTokenForApp().decodeIdentifier();
return tokenMgrService.renewToken(appCollector.getDelegationTokenForApp(),
identifier.getRenewer().toString());
} else {
LOG.info("Delegation token not available for renewal for app " +
appCollector.getTimelineEntityContext().getAppId());
return -1;
}
}
@VisibleForTesting
public void cancelTokenForAppCollector(
AppLevelTimelineCollector appCollector) throws IOException {
@ -174,13 +209,19 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
(AppLevelTimelineCollector)collector;
Token<TimelineDelegationTokenIdentifier> timelineToken =
generateTokenForAppCollector(appCollector.getAppUser());
appCollector.setDelegationTokenForApp(timelineToken);
long renewalDelay = (tokenRenewInterval > TIME_BEFORE_RENEW_DATE) ?
tokenRenewInterval - TIME_BEFORE_RENEW_DATE : tokenRenewInterval;
Future<?> renewalFuture =
tokenRenewalExecutor.schedule(new CollectorTokenRenewer(appId),
renewalDelay, TimeUnit.MILLISECONDS);
appCollector.setDelegationTokenAndFutureForApp(timelineToken,
renewalFuture);
token = org.apache.hadoop.yarn.api.records.Token.newInstance(
timelineToken.getIdentifier(), timelineToken.getKind().toString(),
timelineToken.getPassword(), timelineToken.getService().toString());
}
// Report to NM if a new collector is added.
reportNewCollectorToNM(appId, token);
reportNewCollectorInfoToNM(appId, token);
} catch (YarnException | IOException e) {
// throw exception here as it cannot be used if failed communicate with NM
LOG.error("Failed to communicate with NM Collector Service for " + appId);
@ -192,7 +233,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
protected void postRemove(ApplicationId appId, TimelineCollector collector) {
if (collector instanceof AppLevelTimelineCollector) {
try {
cancelTokenForAppCollector((AppLevelTimelineCollector)collector);
cancelTokenForAppCollector((AppLevelTimelineCollector) collector);
} catch (IOException e) {
LOG.warn("Failed to cancel token for app collector with appId " +
appId, e);
@ -244,7 +285,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
timelineRestServerBindAddress);
}
private void reportNewCollectorToNM(ApplicationId appId,
private void reportNewCollectorInfoToNM(ApplicationId appId,
org.apache.hadoop.yarn.api.records.Token token)
throws YarnException, IOException {
ReportNewCollectorInfoRequest request =
@ -321,4 +362,43 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
public String getRestServerBindAddress() {
return timelineRestServerBindAddress;
}
private final class CollectorTokenRenewer implements Runnable {
private ApplicationId appId;
private CollectorTokenRenewer(ApplicationId applicationId) {
appId = applicationId;
}
@Override
public void run() {
TimelineCollector collector = get(appId);
if (collector == null) {
LOG.info("Cannot find active collector while renewing token for " +
appId);
return;
}
AppLevelTimelineCollector appCollector =
(AppLevelTimelineCollector) collector;
synchronized (collector) {
if (!collector.isStopped()) {
try {
long newExpirationTime = renewTokenForAppCollector(appCollector);
if (newExpirationTime > 0) {
long renewInterval = newExpirationTime - Time.now();
long renewalDelay = (renewInterval > TIME_BEFORE_RENEW_DATE) ?
renewInterval - TIME_BEFORE_RENEW_DATE : renewInterval;
LOG.info("Renewed token for " + appId + " with new expiration " +
"timestamp = " + newExpirationTime);
Future<?> renewalFuture = tokenRenewalExecutor.schedule(
this, renewalDelay, TimeUnit.MILLISECONDS);
appCollector.setRenewalFutureForApp(renewalFuture);
}
} catch (Exception e) {
LOG.warn("Unable to renew token for " + appId);
}
}
}
}
}
}

View File

@ -62,6 +62,8 @@ public abstract class TimelineCollector extends CompositeService {
private volatile boolean readyToAggregate = false;
private volatile boolean isStopped = false;
public TimelineCollector(String name) {
super(name);
}
@ -78,9 +80,14 @@ public abstract class TimelineCollector extends CompositeService {
@Override
protected void serviceStop() throws Exception {
isStopped = true;
super.serviceStop();
}
boolean isStopped() {
return isStopped;
}
protected void setWriter(TimelineWriter w) {
this.writer = w;
}

View File

@ -184,9 +184,11 @@ public class TimelineCollectorManager extends CompositeService {
if (collector == null) {
LOG.error("the collector for " + appId + " does not exist!");
} else {
postRemove(appId, collector);
// stop the service to do clean up
collector.stop();
synchronized (collector) {
postRemove(appId, collector);
// stop the service to do clean up
collector.stop();
}
LOG.info("The collector service for " + appId + " was removed");
}
return collector != null;

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSec
*/
public class TimelineV2DelegationTokenSecretManagerService extends
TimelineDelgationTokenSecretManagerService {
public TimelineV2DelegationTokenSecretManagerService() {
super(TimelineV2DelegationTokenSecretManagerService.class.getName());
}
@ -54,6 +55,11 @@ public class TimelineV2DelegationTokenSecretManagerService extends
getTimelineDelegationTokenSecretManager()).generateToken(ugi, renewer);
}
public long renewToken(Token<TimelineDelegationTokenIdentifier> token,
String renewer) throws IOException {
return getTimelineDelegationTokenSecretManager().renewToken(token, renewer);
}
public void cancelToken(Token<TimelineDelegationTokenIdentifier> token,
String canceller) throws IOException {
getTimelineDelegationTokenSecretManager().cancelToken(token, canceller);