YARN-674. Fixed ResourceManager to renew DelegationTokens on submission asynchronously to work around potential slowness in state-store. Contributed by Omkar Vinit Joshi.

svn merge --ignore-ancestry -c 1543312 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1543313 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-11-19 05:21:45 +00:00
parent 8613e7ef3d
commit 0b57905309
8 changed files with 519 additions and 168 deletions

View File

@ -89,6 +89,10 @@ Release 2.3.0 - UNRELEASED
ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
vinodkv)
YARN-674. Fixed ResourceManager to renew DelegationTokens on submission
asynchronously to work around potential slowness in state-store. (Omkar Vinit
Joshi via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -505,6 +505,11 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
/** Delegation Token renewer thread count */
public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
RM_PREFIX + "delegation-token-renewer.thread-count";
public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";

View File

@ -318,7 +318,7 @@ public SubmitApplicationResponse submitApplication(
try {
// call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), false, user);
System.currentTimeMillis(), user, false, null);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@ -240,31 +241,59 @@ protected synchronized void checkAppNumCompletedLimit() {
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
boolean isRecovered, String user) throws YarnException {
String user, boolean isRecovered, RMState state) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
// Validation of the ApplicationSubmissionContext needs to be completed
// here. Only those fields that are dependent on RM's configuration are
// checked here as they have to be validated whether they are part of new
// submission or just being recovered.
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user);
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
ResourceRequest amReq = BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1);
if (isRecovered) {
recoverApplication(state, application);
RMAppState rmAppState =
state.getApplicationState().get(applicationId).getState();
if (isApplicationInFinalState(rmAppState)) {
// We are synchronously moving the application into final state so that
// momentarily client will not see this application in NEW state. Also
// for finished applications we will avoid renewing tokens.
application
.handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
return;
}
}
if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = null;
try {
SchedulerUtils.validateResourceRequest(amReq,
scheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + applicationId, e);
throw e;
credentials = parseCredentials(submissionContext);
} catch (Exception e) {
LOG.warn(
"Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application
assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(applicationId, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
this.rmContext.getDelegationTokenRenewer().addApplication(
applicationId, credentials,
submissionContext.getCancelTokensWhenComplete(), isRecovered);
} else {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
}
}
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext,
long submitTime, String user)
throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
validateResourceRequest(submissionContext);
// Create RMApp
RMApp application =
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
@ -281,35 +310,52 @@ protected void submitApplication(
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
return application;
}
private void validateResourceRequest(
ApplicationSubmissionContext submissionContext)
throws InvalidResourceRequestException {
// Validation of the ApplicationSubmissionContext needs to be completed
// here. Only those fields that are dependent on RM's configuration are
// checked here as they have to be validated whether they are part of new
// submission or just being recovered.
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
ResourceRequest amReq = BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1);
try {
// Setup tokens for renewal
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer().addApplication(
applicationId,parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete()
);
SchedulerUtils.validateResourceRequest(amReq,
scheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + submissionContext.getApplicationId(), e);
throw e;
}
}
} catch (IOException ie) {
LOG.warn(
"Unable to add the application to the delegation token renewer.",
ie);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we havne't yet informed the
// Scheduler about the existence of the application
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(applicationId, ie.getMessage()));
throw RPCUtil.getRemoteException(ie);
}
if (!isRecovered) {
// All done, start the RMApp
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
private void recoverApplication(RMState state, RMAppImpl application)
throws YarnException {
try {
application.recover(state);
} catch (Exception e) {
LOG.error("Error recovering application", e);
throw new YarnException(e);
}
}
private boolean isApplicationInFinalState(RMAppState rmAppState) {
if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
|| rmAppState == RMAppState.KILLED) {
return true;
} else {
return false;
}
}
@ -335,17 +381,9 @@ public void recover(RMState state) throws Exception {
LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) {
LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(),
appState.getSubmitTime(), true, appState.getUser());
// re-populate attempt information in application
RMAppImpl appImpl =
(RMAppImpl) rmContext.getRMApps().get(appState.getAppId());
appImpl.recover(state);
// Recover the app synchronously, as otherwise client is possible to see
// the application not recovered before it is actually recovered because
// ClientRMService is already started at this point of time.
appImpl.handle(new RMAppEvent(appImpl.getApplicationId(),
RMAppEventType.RECOVER));
appState.getSubmitTime(), appState.getUser(), true, state);
}
}

View File

@ -34,6 +34,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@ -48,10 +52,15 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Service to renew application delegation tokens.
@ -72,6 +81,7 @@ public class DelegationTokenRenewer extends AbstractService {
// delegation token canceler thread
private DelegationTokenCancelThread dtCancelThread =
new DelegationTokenCancelThread();
private ThreadPoolExecutor renewerService;
// managing the list of tokens using Map
// appId=>List<tokens>
@ -84,9 +94,9 @@ public class DelegationTokenRenewer extends AbstractService {
private long tokenRemovalDelayMs;
private Thread delayedRemovalThread;
private boolean isServiceStarted = false;
private List<DelegationTokenToRenew> pendingTokenForRenewal =
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
private ReadWriteLock serviceStateLock = new ReentrantReadWriteLock();
private volatile boolean isServiceStarted;
private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
private boolean tokenKeepAliveEnabled;
@ -102,9 +112,27 @@ protected synchronized void serviceInit(Configuration conf) throws Exception {
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
renewerService = createNewThreadPoolService(conf);
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
super.serviceInit(conf);
}
protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
int nThreads = conf.getInt(
YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("DelegationTokenRenewer #%d")
.build();
ThreadPoolExecutor pool =
new ThreadPoolExecutor((5 < nThreads ? 5 : nThreads), nThreads, 3L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
pool.setThreadFactory(tf);
pool.allowCoreThreadTimeOut(true);
return pool;
}
@Override
protected void serviceStart() throws Exception {
dtCancelThread.start();
@ -119,21 +147,36 @@ protected void serviceStart() throws Exception {
RMDelegationTokenIdentifier.Renewer.setSecretManager(
rmContext.getRMDelegationTokenSecretManager(),
rmContext.getClientRMService().getBindAddress());
// Delegation token renewal is delayed until ClientRMService starts. As
// it is required to short circuit the token renewal calls.
serviceStateLock.writeLock().lock();
isServiceStarted = true;
renewIfServiceIsStarted(pendingTokenForRenewal);
pendingTokenForRenewal.clear();
serviceStateLock.writeLock().unlock();
while(!pendingEventQueue.isEmpty()) {
processDelegationTokenRewewerEvent(pendingEventQueue.take());
}
super.serviceStart();
}
private void processDelegationTokenRewewerEvent(
DelegationTokenRenewerEvent evt) {
serviceStateLock.readLock().lock();
try {
if (isServiceStarted) {
renewerService.execute(new DelegationTokenRenewerRunnable(evt));
} else {
pendingEventQueue.add(evt);
}
} finally {
serviceStateLock.readLock().unlock();
}
}
@Override
protected void serviceStop() {
if (renewalTimer != null) {
renewalTimer.cancel();
}
delegationTokens.clear();
this.renewerService.shutdown();
dtCancelThread.interrupt();
try {
dtCancelThread.join(1000);
@ -290,10 +333,20 @@ public Set<Token<?>> getDelegationTokens() {
* @throws IOException
*/
public void addApplication(
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd,
boolean isApplicationRecovered) {
processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent(
applicationId, ts,
shouldCancelAtEnd, isApplicationRecovered));
}
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
throws IOException {
ApplicationId applicationId = evt.getApplicationId();
Credentials ts = evt.getCredentials();
boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
if (ts == null) {
return; //nothing to add
return; // nothing to add
}
if (LOG.isDebugEnabled()) {
@ -301,7 +354,7 @@ public void addApplication(
" appId = " + applicationId);
}
Collection <Token<?>> tokens = ts.getAllTokens();
Collection<Token<?>> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();
// find tokens for renewal, but don't add timers until we know
@ -310,27 +363,20 @@ public void addApplication(
// are valid
List<DelegationTokenToRenew> tokenList =
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
for(Token<?> token : tokens) {
for (Token<?> token : tokens) {
if (token.isManaged()) {
tokenList.add(new DelegationTokenToRenew(applicationId,
token, getConfig(), now, shouldCancelAtEnd));
}
}
if (!tokenList.isEmpty()){
renewIfServiceIsStarted(tokenList);
}
}
protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
throws IOException {
if (isServiceStarted) {
if (!tokenList.isEmpty()) {
// Renewing token and adding it to timer calls are separated purposefully
// If user provides incorrect token then it should not be added for
// renewal.
for (DelegationTokenToRenew dtr : dtrs) {
for (DelegationTokenToRenew dtr : tokenList) {
renewToken(dtr);
}
for (DelegationTokenToRenew dtr : dtrs) {
for (DelegationTokenToRenew dtr : tokenList) {
addTokenToList(dtr);
setTimerForTokenRenewal(dtr);
if (LOG.isDebugEnabled()) {
@ -338,8 +384,6 @@ protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
}
}
} else {
pendingTokenForRenewal.addAll(dtrs);
}
}
@ -449,10 +493,16 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) {
* @param applicationId completed application
*/
public void applicationFinished(ApplicationId applicationId) {
processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent(
applicationId,
DelegationTokenRenewerEventType.FINISH_APPLICATION));
}
private void handleAppFinishEvent(DelegationTokenRenewerEvent evt) {
if (!tokenKeepAliveEnabled) {
removeApplicationFromRenewal(applicationId);
removeApplicationFromRenewal(evt.getApplicationId());
} else {
delayedRemovalMap.put(applicationId, System.currentTimeMillis()
delayedRemovalMap.put(evt.getApplicationId(), System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
@ -546,4 +596,111 @@ public void run() {
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
/*
* This will run as a separate thread and will process individual events. It
* is done in this way to make sure that the token renewal as a part of
* application submission and token removal as a part of application finish
* is asynchronous in nature.
*/
private final class DelegationTokenRenewerRunnable
implements Runnable {
private DelegationTokenRenewerEvent evt;
public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) {
this.evt = evt;
}
@Override
public void run() {
if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
(DelegationTokenRenewerAppSubmitEvent) evt;
handleDTRenewerAppSubmitEvent(appSubmitEvt);
} else if (evt.getType().equals(
DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
DelegationTokenRenewer.this.handleAppFinishEvent(evt);
}
}
@SuppressWarnings("unchecked")
private void handleDTRenewerAppSubmitEvent(
DelegationTokenRenewerAppSubmitEvent event) {
/*
* For applications submitted with delegation tokens we are not submitting
* the application to scheduler from RMAppManager. Instead we are doing
* it from here. The primary goal is to make token renewal as a part of
* application submission asynchronous so that client thread is not
* blocked during app submission.
*/
try {
// Setup tokens for renewal
DelegationTokenRenewer.this.handleAppSubmitEvent(event);
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(event.getApplicationId(),
event.isApplicationRecovered() ? RMAppEventType.RECOVER
: RMAppEventType.START));
} catch (Throwable t) {
LOG.warn(
"Unable to add the application to the delegation token renewer.",
t);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we havne't yet informed the
// Scheduler about the existence of the application
rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(event.getApplicationId(), t.getMessage()));
}
}
}
class DelegationTokenRenewerAppSubmitEvent extends
DelegationTokenRenewerEvent {
private Credentials credentials;
private boolean shouldCancelAtEnd;
private boolean isAppRecovered;
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
Credentials credentails, boolean shouldCancelAtEnd,
boolean isApplicationRecovered) {
super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
this.credentials = credentails;
this.shouldCancelAtEnd = shouldCancelAtEnd;
this.isAppRecovered = isApplicationRecovered;
}
public Credentials getCredentials() {
return credentials;
}
public boolean shouldCancelAtEnd() {
return shouldCancelAtEnd;
}
public boolean isApplicationRecovered() {
return isAppRecovered;
}
}
enum DelegationTokenRenewerEventType {
VERIFY_AND_START_APPLICATION,
FINISH_APPLICATION
}
class DelegationTokenRenewerEvent extends
AbstractEvent<DelegationTokenRenewerEventType> {
private ApplicationId appId;
public DelegationTokenRenewerEvent(ApplicationId appId,
DelegationTokenRenewerEventType type) {
super(type);
this.appId = appId;
}
public ApplicationId getApplicationId() {
return appId;
}
}
}

View File

@ -172,7 +172,7 @@ public void submitApplication(
ApplicationSubmissionContext submissionContext, String user)
throws YarnException {
super.submitApplication(submissionContext, System.currentTimeMillis(),
false, user);
user, false, null);
}
}

View File

@ -1009,6 +1009,10 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
// Need to wait for a while as now token renewal happens on another thread
// and is asynchronous in nature.
waitForTokensToBeRenewed(rm2);
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
@ -1018,6 +1022,21 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
rm2.stop();
}
private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
int waitCnt = 20;
boolean atleastOneAppInNEWState = true;
while (waitCnt-- > 0 && atleastOneAppInNEWState) {
atleastOneAppInNEWState = false;
for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
if (rmApp.getState() == RMAppState.NEW) {
Thread.sleep(1000);
atleastOneAppInNEWState = true;
break;
}
}
}
}
@Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);

View File

@ -31,13 +31,24 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -46,16 +57,29 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
@ -66,14 +90,18 @@
/**
* unit test -
* tests addition/deletion/cancelation of renewals of delegation tokens
* tests addition/deletion/cancellation of renewals of delegation tokens
*
*/
@SuppressWarnings("rawtypes")
public class TestDelegationTokenRenewer {
private static final Log LOG =
LogFactory.getLog(TestDelegationTokenRenewer.class);
private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
private static BlockingQueue<Event> eventQueue;
private static volatile AtomicInteger counter;
private static AsyncDispatcher dispatcher;
public static class Renewer extends TokenRenewer {
private static int counter = 0;
private static Token<?> lastRenewed = null;
@ -143,11 +171,20 @@ public static void setUpClass() throws Exception {
@Before
public void setUp() throws Exception {
counter = new AtomicInteger(0);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
eventQueue = new LinkedBlockingQueue<Event>();
dispatcher = new AsyncDispatcher(eventQueue);
Renewer.reset();
delegationTokenRenewer = new DelegationTokenRenewer();
delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
delegationTokenRenewer.init(conf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getDelegationTokenRenewer()).thenReturn(
delegationTokenRenewer);
when(mockContext.getDispatcher()).thenReturn(dispatcher);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
@ -285,7 +322,7 @@ static MyToken createTokens(Text renewer)
* @throws IOException
* @throws URISyntaxException
*/
@Test
@Test(timeout=60000)
public void testDTRenewal () throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@ -316,7 +353,8 @@ public void testDTRenewal () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 =
BuilderUtils.newApplicationId(0, 0);
delegationTokenRenewer.addApplication(applicationId_0, ts, true);
delegationTokenRenewer.addApplication(applicationId_0, ts, true, false);
waitForEventsToGetProcessed(delegationTokenRenewer);
// first 3 initial renewals + 1 real
int numberOfExpectedRenewals = 3+1;
@ -355,9 +393,10 @@ public void testDTRenewal () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, true);
delegationTokenRenewer.addApplication(applicationId_1, ts, true, false);
waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer);
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@ -377,8 +416,8 @@ public void testDTRenewal () throws Exception {
}
}
@Test
public void testInvalidDTWithAddApplication() throws Exception {
@Test(timeout=60000)
public void testAppRejectionWithCancelledDelegationToken() throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@ -390,12 +429,21 @@ public void testInvalidDTWithAddApplication() throws Exception {
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
try {
delegationTokenRenewer.addApplication(appId, ts, true);
fail("App submission with a cancelled token should have failed");
} catch (InvalidToken e) {
// expected
delegationTokenRenewer.addApplication(appId, ts, true, false);
int waitCnt = 20;
while (waitCnt-- >0) {
if (!eventQueue.isEmpty()) {
Event evt = eventQueue.take();
if (evt.getType() == RMAppEventType.APP_REJECTED) {
Assert.assertTrue(
((RMAppEvent) evt).getApplicationId().equals(appId));
return;
}
} else {
Thread.sleep(500);
}
}
fail("App submission with a cancelled token should have failed");
}
/**
@ -408,7 +456,7 @@ public void testInvalidDTWithAddApplication() throws Exception {
* @throws IOException
* @throws URISyntaxException
*/
@Test
@Test(timeout=60000)
public void testDTRenewalWithNoCancel () throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@ -425,9 +473,10 @@ public void testDTRenewalWithNoCancel () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, false);
delegationTokenRenewer.addApplication(applicationId_1, ts, false, false);
waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer);
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@ -454,9 +503,8 @@ public void testDTRenewalWithNoCancel () throws Exception {
* @throws IOException
* @throws URISyntaxException
*/
@Test
@Test(timeout=60000)
public void testDTKeepAlive1 () throws Exception {
DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
@ -465,10 +513,15 @@ public void testDTKeepAlive1 () throws Exception {
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
DelegationTokenRenewer localDtr =
createNewDelegationTokenRenewer(lconf, counter);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
when(mockContext.getDelegationTokenRenewer()).thenReturn(
localDtr);
when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@ -487,16 +540,25 @@ public void testDTKeepAlive1 () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplication(applicationId_0, ts, true);
localDtr.applicationFinished(applicationId_0);
localDtr.addApplication(applicationId_0, ts, true, false);
waitForEventsToGetProcessed(localDtr);
if (!eventQueue.isEmpty()){
Event evt = eventQueue.take();
if (evt instanceof RMAppEvent) {
Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START);
} else {
fail("RMAppEvent.START was expected!!");
}
}
Thread.sleep(3000l);
localDtr.applicationFinished(applicationId_0);
waitForEventsToGetProcessed(localDtr);
//Token should still be around. Renewal should not fail.
token1.renew(lconf);
//Allow the keepalive time to run out
Thread.sleep(6000l);
Thread.sleep(10000l);
//The token should have been cancelled at this point. Renewal will fail.
try {
@ -518,9 +580,8 @@ public void testDTKeepAlive1 () throws Exception {
* @throws IOException
* @throws URISyntaxException
*/
@Test
@Test(timeout=60000)
public void testDTKeepAlive2() throws Exception {
DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
@ -529,10 +590,15 @@ public void testDTKeepAlive2() throws Exception {
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
DelegationTokenRenewer localDtr =
createNewDelegationTokenRenewer(conf, counter);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
when(mockContext.getDelegationTokenRenewer()).thenReturn(
localDtr);
when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@ -551,22 +617,18 @@ public void testDTKeepAlive2() throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplication(applicationId_0, ts, true);
localDtr.addApplication(applicationId_0, ts, true, false);
localDtr.applicationFinished(applicationId_0);
Thread.sleep(4000l);
waitForEventsToGetProcessed(delegationTokenRenewer);
//Send another keep alive.
localDtr.updateKeepAliveApplications(Collections
.singletonList(applicationId_0));
//Renewal should not fail.
token1.renew(lconf);
//Token should be around after this.
Thread.sleep(4500l);
//Renewal should not fail. - ~1.5 seconds for keepalive timeout.
token1.renew(lconf);
//Allow the keepalive time to run out
Thread.sleep(3000l);
//The token should have been cancelled at this point. Renewal will fail.
@ -576,8 +638,45 @@ public void testDTKeepAlive2() throws Exception {
} catch (InvalidToken ite) {}
}
private DelegationTokenRenewer createNewDelegationTokenRenewer(
Configuration conf, final AtomicInteger counter) {
return new DelegationTokenRenewer() {
@Override
protected ThreadPoolExecutor
createNewThreadPoolService(Configuration conf) {
ThreadPoolExecutor pool =
new ThreadPoolExecutor(5, 5, 3L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
counter.decrementAndGet();
super.afterExecute(r, t);
}
@Override
public void execute(Runnable command) {
counter.incrementAndGet();
super.execute(command);
}
};
return pool;
}
};
}
private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
throws InterruptedException {
int wait = 40;
while (wait-- > 0
&& counter.get() > 0) {
Thread.sleep(200);
}
}
@Test(timeout=20000)
public void testConncurrentAddApplication()
public void testConcurrentAddApplication()
throws IOException, InterruptedException, BrokenBarrierException {
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CyclicBarrier endBarrier = new CyclicBarrier(2);
@ -603,7 +702,8 @@ public Long answer(InvocationOnMock invocation)
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
// fire up the renewer
final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
final DelegationTokenRenewer dtr =
createNewDelegationTokenRenewer(conf, counter);
dtr.init(conf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
@ -612,24 +712,52 @@ public Long answer(InvocationOnMock invocation)
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
dtr.setRMContext(mockContext);
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
dtr.start();
// submit a job that blocks during renewal
Thread submitThread = new Thread() {
@Override
public void run() {
try {
dtr.addApplication(mock(ApplicationId.class), creds1, false);
} catch (IOException e) {}
dtr.addApplication(mock(ApplicationId.class), creds1, false, false);
}
};
submitThread.start();
// wait till 1st submit blocks, then submit another
startBarrier.await();
dtr.addApplication(mock(ApplicationId.class), creds2, false);
dtr.addApplication(mock(ApplicationId.class), creds2, false, false);
// signal 1st to complete
endBarrier.await();
submitThread.join();
}
@Test(timeout=20000)
public void testAppSubmissionWithInvalidDelegationToken() throws Exception {
Configuration conf = new Configuration();
conf.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
MockRM rm = new MockRM(conf);
ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes());
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(
new HashMap<String, LocalResource>(), new HashMap<String, String>(),
new ArrayList<String>(), new HashMap<String, ByteBuffer>(), tokens,
new HashMap<ApplicationAccessType, String>());
ApplicationSubmissionContext appSubContext =
ApplicationSubmissionContext.newInstance(
ApplicationId.newInstance(1234121, 0),
"BOGUS", "default", Priority.UNDEFINED, amContainer, false,
true, 1, Resource.newInstance(1024, 1), "BOGUS");
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(appSubContext);
try {
rm.getClientRMService().submitApplication(request);
fail("Error was excepted.");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().contains(
"Bad header found in token storage"));
}
}
}