diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ba851de872a..29c8b8049c7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -89,6 +89,10 @@ Release 2.3.0 - UNRELEASED ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via vinodkv) + YARN-674. Fixed ResourceManager to renew DelegationTokens on submission + asynchronously to work around potential slowness in state-store. (Omkar Vinit + Joshi via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 961d6ddd03a..72ad08f1482 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -504,6 +504,11 @@ public class YarnConfiguration extends Configuration { RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = 30000l; + + /** Delegation Token renewer thread count */ + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = + RM_PREFIX + "delegation-token-renewer.thread-count"; + public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50; /** Whether to enable log aggregation */ public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index d94bc9e5186..c3410a9b462 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -318,7 +318,7 @@ public class ClientRMService extends AbstractService implements try { // call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext, - System.currentTimeMillis(), false, user); + System.currentTimeMillis(), user, false, null); LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 9df9fa29603..4dfa3ba6d22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -236,35 +237,63 @@ public class RMAppManager implements EventHandler, this.applicationACLsManager.removeApplication(removeId); } } - + @SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, - boolean isRecovered, String user) throws YarnException { + String user, boolean isRecovered, RMState state) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); - // Validation of the ApplicationSubmissionContext needs to be completed - // here. Only those fields that are dependent on RM's configuration are - // checked here as they have to be validated whether they are part of new - // submission or just being recovered. + RMAppImpl application = + createAndPopulateNewRMApp(submissionContext, submitTime, user); - // Check whether AM resource requirements are within required limits - if (!submissionContext.getUnmanagedAM()) { - ResourceRequest amReq = BuilderUtils.newResourceRequest( - RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1); - try { - SchedulerUtils.validateResourceRequest(amReq, - scheduler.getMaximumResourceCapability()); - } catch (InvalidResourceRequestException e) { - LOG.warn("RM app submission failed in validating AM resource request" - + " for application " + applicationId, e); - throw e; + if (isRecovered) { + recoverApplication(state, application); + RMAppState rmAppState = + state.getApplicationState().get(applicationId).getState(); + if (isApplicationInFinalState(rmAppState)) { + // We are synchronously moving the application into final state so that + // momentarily client will not see this application in NEW state. Also + // for finished applications we will avoid renewing tokens. + application + .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER)); + return; } } + + if (UserGroupInformation.isSecurityEnabled()) { + Credentials credentials = null; + try { + credentials = parseCredentials(submissionContext); + } catch (Exception e) { + LOG.warn( + "Unable to parse credentials.", e); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we haven't yet informed the + // scheduler about the existence of the application + assert application.getState() == RMAppState.NEW; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(applicationId, e.getMessage())); + throw RPCUtil.getRemoteException(e); + } + this.rmContext.getDelegationTokenRenewer().addApplication( + applicationId, credentials, + submissionContext.getCancelTokensWhenComplete(), isRecovered); + } else { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, + isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START)); + } + } + private RMAppImpl createAndPopulateNewRMApp( + ApplicationSubmissionContext submissionContext, + long submitTime, String user) + throws YarnException { + ApplicationId applicationId = submissionContext.getApplicationId(); + validateResourceRequest(submissionContext); // Create RMApp - RMApp application = + RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), @@ -281,35 +310,52 @@ public class RMAppManager implements EventHandler, LOG.warn(message); throw RPCUtil.getRemoteException(message); } - // Inform the ACLs Manager this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()); + return application; + } - try { - // Setup tokens for renewal - if (UserGroupInformation.isSecurityEnabled()) { - this.rmContext.getDelegationTokenRenewer().addApplication( - applicationId,parseCredentials(submissionContext), - submissionContext.getCancelTokensWhenComplete() - ); + private void validateResourceRequest( + ApplicationSubmissionContext submissionContext) + throws InvalidResourceRequestException { + // Validation of the ApplicationSubmissionContext needs to be completed + // here. Only those fields that are dependent on RM's configuration are + // checked here as they have to be validated whether they are part of new + // submission or just being recovered. + + // Check whether AM resource requirements are within required limits + if (!submissionContext.getUnmanagedAM()) { + ResourceRequest amReq = BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + submissionContext.getResource(), 1); + try { + SchedulerUtils.validateResourceRequest(amReq, + scheduler.getMaximumResourceCapability()); + } catch (InvalidResourceRequestException e) { + LOG.warn("RM app submission failed in validating AM resource request" + + " for application " + submissionContext.getApplicationId(), e); + throw e; } - } catch (IOException ie) { - LOG.warn( - "Unable to add the application to the delegation token renewer.", - ie); - // Sending APP_REJECTED is fine, since we assume that the - // RMApp is in NEW state and thus we havne't yet informed the - // Scheduler about the existence of the application - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppRejectedEvent(applicationId, ie.getMessage())); - throw RPCUtil.getRemoteException(ie); } + } - if (!isRecovered) { - // All done, start the RMApp - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.START)); + private void recoverApplication(RMState state, RMAppImpl application) + throws YarnException { + try { + application.recover(state); + } catch (Exception e) { + LOG.error("Error recovering application", e); + throw new YarnException(e); + } + } + + private boolean isApplicationInFinalState(RMAppState rmAppState) { + if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED + || rmAppState == RMAppState.KILLED) { + return true; + } else { + return false; } } @@ -335,17 +381,9 @@ public class RMAppManager implements EventHandler, LOG.info("Recovering " + appStates.size() + " applications"); for (ApplicationState appState : appStates.values()) { LOG.info("Recovering application " + appState.getAppId()); + submitApplication(appState.getApplicationSubmissionContext(), - appState.getSubmitTime(), true, appState.getUser()); - // re-populate attempt information in application - RMAppImpl appImpl = - (RMAppImpl) rmContext.getRMApps().get(appState.getAppId()); - appImpl.recover(state); - // Recover the app synchronously, as otherwise client is possible to see - // the application not recovered before it is actually recovered because - // ClientRMService is already started at this point of time. - appImpl.handle(new RMAppEvent(appImpl.getApplicationId(), - RMAppEventType.RECOVER)); + appState.getSubmitTime(), appState.getUser(), true, state); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a58b9175f8e..ce9f7ae0625 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -34,6 +34,10 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -48,10 +52,15 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Service to renew application delegation tokens. @@ -72,7 +81,8 @@ public class DelegationTokenRenewer extends AbstractService { // delegation token canceler thread private DelegationTokenCancelThread dtCancelThread = new DelegationTokenCancelThread(); - + private ThreadPoolExecutor renewerService; + // managing the list of tokens using Map // appId=>List private Set delegationTokens = @@ -84,9 +94,9 @@ public class DelegationTokenRenewer extends AbstractService { private long tokenRemovalDelayMs; private Thread delayedRemovalThread; - private boolean isServiceStarted = false; - private List pendingTokenForRenewal = - new ArrayList(); + private ReadWriteLock serviceStateLock = new ReentrantReadWriteLock(); + private volatile boolean isServiceStarted; + private LinkedBlockingQueue pendingEventQueue; private boolean tokenKeepAliveEnabled; @@ -102,9 +112,27 @@ public class DelegationTokenRenewer extends AbstractService { this.tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + renewerService = createNewThreadPoolService(conf); + pendingEventQueue = new LinkedBlockingQueue(); super.serviceInit(conf); } + protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) { + int nThreads = conf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT); + + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat("DelegationTokenRenewer #%d") + .build(); + ThreadPoolExecutor pool = + new ThreadPoolExecutor((5 < nThreads ? 5 : nThreads), nThreads, 3L, + TimeUnit.SECONDS, new LinkedBlockingQueue()); + pool.setThreadFactory(tf); + pool.allowCoreThreadTimeOut(true); + return pool; + } + @Override protected void serviceStart() throws Exception { dtCancelThread.start(); @@ -119,21 +147,36 @@ public class DelegationTokenRenewer extends AbstractService { RMDelegationTokenIdentifier.Renewer.setSecretManager( rmContext.getRMDelegationTokenSecretManager(), rmContext.getClientRMService().getBindAddress()); - // Delegation token renewal is delayed until ClientRMService starts. As - // it is required to short circuit the token renewal calls. + serviceStateLock.writeLock().lock(); isServiceStarted = true; - renewIfServiceIsStarted(pendingTokenForRenewal); - pendingTokenForRenewal.clear(); + serviceStateLock.writeLock().unlock(); + while(!pendingEventQueue.isEmpty()) { + processDelegationTokenRewewerEvent(pendingEventQueue.take()); + } super.serviceStart(); } + private void processDelegationTokenRewewerEvent( + DelegationTokenRenewerEvent evt) { + serviceStateLock.readLock().lock(); + try { + if (isServiceStarted) { + renewerService.execute(new DelegationTokenRenewerRunnable(evt)); + } else { + pendingEventQueue.add(evt); + } + } finally { + serviceStateLock.readLock().unlock(); + } + } + @Override protected void serviceStop() { if (renewalTimer != null) { renewalTimer.cancel(); } delegationTokens.clear(); - + this.renewerService.shutdown(); dtCancelThread.interrupt(); try { dtCancelThread.join(1000); @@ -290,47 +333,50 @@ public class DelegationTokenRenewer extends AbstractService { * @throws IOException */ public void addApplication( - ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd) + ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd, + boolean isApplicationRecovered) { + processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent( + applicationId, ts, + shouldCancelAtEnd, isApplicationRecovered)); + } + + private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) throws IOException { + ApplicationId applicationId = evt.getApplicationId(); + Credentials ts = evt.getCredentials(); + boolean shouldCancelAtEnd = evt.shouldCancelAtEnd(); if (ts == null) { - return; //nothing to add + return; // nothing to add } - + if (LOG.isDebugEnabled()) { - LOG.debug("Registering tokens for renewal for:" + + LOG.debug("Registering tokens for renewal for:" + " appId = " + applicationId); } - - Collection > tokens = ts.getAllTokens(); + + Collection> tokens = ts.getAllTokens(); long now = System.currentTimeMillis(); - + // find tokens for renewal, but don't add timers until we know // all renewable tokens are valid // At RM restart it is safe to assume that all the previously added tokens // are valid List tokenList = new ArrayList(); - for(Token token : tokens) { + for (Token token : tokens) { if (token.isManaged()) { tokenList.add(new DelegationTokenToRenew(applicationId, token, getConfig(), now, shouldCancelAtEnd)); } } - if (!tokenList.isEmpty()){ - renewIfServiceIsStarted(tokenList); - } - } - - protected void renewIfServiceIsStarted(List dtrs) - throws IOException { - if (isServiceStarted) { + if (!tokenList.isEmpty()) { // Renewing token and adding it to timer calls are separated purposefully // If user provides incorrect token then it should not be added for // renewal. - for (DelegationTokenToRenew dtr : dtrs) { + for (DelegationTokenToRenew dtr : tokenList) { renewToken(dtr); } - for (DelegationTokenToRenew dtr : dtrs) { + for (DelegationTokenToRenew dtr : tokenList) { addTokenToList(dtr); setTimerForTokenRenewal(dtr); if (LOG.isDebugEnabled()) { @@ -338,11 +384,9 @@ public class DelegationTokenRenewer extends AbstractService { + dtr.token.getService() + " for appId = " + dtr.applicationId); } } - } else { - pendingTokenForRenewal.addAll(dtrs); } } - + /** * Task - to renew a token * @@ -449,14 +493,20 @@ public class DelegationTokenRenewer extends AbstractService { * @param applicationId completed application */ public void applicationFinished(ApplicationId applicationId) { + processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent( + applicationId, + DelegationTokenRenewerEventType.FINISH_APPLICATION)); + } + + private void handleAppFinishEvent(DelegationTokenRenewerEvent evt) { if (!tokenKeepAliveEnabled) { - removeApplicationFromRenewal(applicationId); + removeApplicationFromRenewal(evt.getApplicationId()); } else { - delayedRemovalMap.put(applicationId, System.currentTimeMillis() + delayedRemovalMap.put(evt.getApplicationId(), System.currentTimeMillis() + tokenRemovalDelayMs); } } - + /** * Add a list of applications to the keep alive list. If an appId already * exists, update it's keep-alive time. @@ -546,4 +596,111 @@ public class DelegationTokenRenewer extends AbstractService { public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } + + /* + * This will run as a separate thread and will process individual events. It + * is done in this way to make sure that the token renewal as a part of + * application submission and token removal as a part of application finish + * is asynchronous in nature. + */ + private final class DelegationTokenRenewerRunnable + implements Runnable { + + private DelegationTokenRenewerEvent evt; + + public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) { + this.evt = evt; + } + + @Override + public void run() { + if (evt instanceof DelegationTokenRenewerAppSubmitEvent) { + DelegationTokenRenewerAppSubmitEvent appSubmitEvt = + (DelegationTokenRenewerAppSubmitEvent) evt; + handleDTRenewerAppSubmitEvent(appSubmitEvt); + } else if (evt.getType().equals( + DelegationTokenRenewerEventType.FINISH_APPLICATION)) { + DelegationTokenRenewer.this.handleAppFinishEvent(evt); + } + } + + @SuppressWarnings("unchecked") + private void handleDTRenewerAppSubmitEvent( + DelegationTokenRenewerAppSubmitEvent event) { + /* + * For applications submitted with delegation tokens we are not submitting + * the application to scheduler from RMAppManager. Instead we are doing + * it from here. The primary goal is to make token renewal as a part of + * application submission asynchronous so that client thread is not + * blocked during app submission. + */ + try { + // Setup tokens for renewal + DelegationTokenRenewer.this.handleAppSubmitEvent(event); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(event.getApplicationId(), + event.isApplicationRecovered() ? RMAppEventType.RECOVER + : RMAppEventType.START)); + } catch (Throwable t) { + LOG.warn( + "Unable to add the application to the delegation token renewer.", + t); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we havne't yet informed the + // Scheduler about the existence of the application + rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(event.getApplicationId(), t.getMessage())); + } + } + } + + class DelegationTokenRenewerAppSubmitEvent extends + DelegationTokenRenewerEvent { + + private Credentials credentials; + private boolean shouldCancelAtEnd; + private boolean isAppRecovered; + + public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, + boolean isApplicationRecovered) { + super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); + this.credentials = credentails; + this.shouldCancelAtEnd = shouldCancelAtEnd; + this.isAppRecovered = isApplicationRecovered; + } + + public Credentials getCredentials() { + return credentials; + } + + public boolean shouldCancelAtEnd() { + return shouldCancelAtEnd; + } + + public boolean isApplicationRecovered() { + return isAppRecovered; + } + } + + enum DelegationTokenRenewerEventType { + VERIFY_AND_START_APPLICATION, + FINISH_APPLICATION + } + + class DelegationTokenRenewerEvent extends + AbstractEvent { + + private ApplicationId appId; + + public DelegationTokenRenewerEvent(ApplicationId appId, + DelegationTokenRenewerEventType type) { + super(type); + this.appId = appId; + } + + public ApplicationId getApplicationId() { + return appId; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 6698412c3ed..8fe8de45470 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -172,7 +172,7 @@ public class TestAppManager{ ApplicationSubmissionContext submissionContext, String user) throws YarnException { super.submitApplication(submissionContext, System.currentTimeMillis(), - false, user); + user, false, null); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 43de3b8f390..d34290e05e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1009,6 +1009,10 @@ public class TestRMRestart { MockRM rm2 = new TestSecurityMockRM(conf, memStore); rm2.start(); + // Need to wait for a while as now token renewal happens on another thread + // and is asynchronous in nature. + waitForTokensToBeRenewed(rm2); + // verify tokens are properly populated back to rm2 DelegationTokenRenewer Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); @@ -1018,6 +1022,21 @@ public class TestRMRestart { rm2.stop(); } + private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { + int waitCnt = 20; + boolean atleastOneAppInNEWState = true; + while (waitCnt-- > 0 && atleastOneAppInNEWState) { + atleastOneAppInNEWState = false; + for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) { + if (rmApp.getState() == RMAppState.NEW) { + Thread.sleep(1000); + atleastOneAppInNEWState = true; + break; + } + } + } + } + @Test public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 98e6ab0f1b7..a6ad9b68437 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -31,13 +31,24 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -46,16 +57,29 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Before; @@ -66,14 +90,18 @@ import org.mockito.stubbing.Answer; /** * unit test - - * tests addition/deletion/cancelation of renewals of delegation tokens + * tests addition/deletion/cancellation of renewals of delegation tokens * */ +@SuppressWarnings("rawtypes") public class TestDelegationTokenRenewer { private static final Log LOG = LogFactory.getLog(TestDelegationTokenRenewer.class); private static final Text KIND = new Text("TestDelegationTokenRenewer.Token"); + private static BlockingQueue eventQueue; + private static volatile AtomicInteger counter; + private static AsyncDispatcher dispatcher; public static class Renewer extends TokenRenewer { private static int counter = 0; private static Token lastRenewed = null; @@ -143,11 +171,20 @@ public class TestDelegationTokenRenewer { @Before public void setUp() throws Exception { + counter = new AtomicInteger(0); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + eventQueue = new LinkedBlockingQueue(); + dispatcher = new AsyncDispatcher(eventQueue); Renewer.reset(); - delegationTokenRenewer = new DelegationTokenRenewer(); + delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter); delegationTokenRenewer.init(conf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getDelegationTokenRenewer()).thenReturn( + delegationTokenRenewer); + when(mockContext.getDispatcher()).thenReturn(dispatcher); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); @@ -285,7 +322,7 @@ public class TestDelegationTokenRenewer { * @throws IOException * @throws URISyntaxException */ - @Test + @Test(timeout=60000) public void testDTRenewal () throws Exception { MyFS dfs = (MyFS)FileSystem.get(conf); LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); @@ -316,8 +353,9 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplication(applicationId_0, ts, true); - + delegationTokenRenewer.addApplication(applicationId_0, ts, true, false); + waitForEventsToGetProcessed(delegationTokenRenewer); + // first 3 initial renewals + 1 real int numberOfExpectedRenewals = 3+1; @@ -355,9 +393,10 @@ public class TestDelegationTokenRenewer { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplication(applicationId_1, ts, true); + delegationTokenRenewer.addApplication(applicationId_1, ts, true, false); + waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); - + waitForEventsToGetProcessed(delegationTokenRenewer); numberOfExpectedRenewals = Renewer.counter; // number of renewals so far try { Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew @@ -377,8 +416,8 @@ public class TestDelegationTokenRenewer { } } - @Test - public void testInvalidDTWithAddApplication() throws Exception { + @Test(timeout=60000) + public void testAppRejectionWithCancelledDelegationToken() throws Exception { MyFS dfs = (MyFS)FileSystem.get(conf); LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); @@ -390,12 +429,21 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId appId = BuilderUtils.newApplicationId(0, 0); - try { - delegationTokenRenewer.addApplication(appId, ts, true); - fail("App submission with a cancelled token should have failed"); - } catch (InvalidToken e) { - // expected + delegationTokenRenewer.addApplication(appId, ts, true, false); + int waitCnt = 20; + while (waitCnt-- >0) { + if (!eventQueue.isEmpty()) { + Event evt = eventQueue.take(); + if (evt.getType() == RMAppEventType.APP_REJECTED) { + Assert.assertTrue( + ((RMAppEvent) evt).getApplicationId().equals(appId)); + return; + } + } else { + Thread.sleep(500); + } } + fail("App submission with a cancelled token should have failed"); } /** @@ -408,7 +456,7 @@ public class TestDelegationTokenRenewer { * @throws IOException * @throws URISyntaxException */ - @Test + @Test(timeout=60000) public void testDTRenewalWithNoCancel () throws Exception { MyFS dfs = (MyFS)FileSystem.get(conf); LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); @@ -425,9 +473,10 @@ public class TestDelegationTokenRenewer { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplication(applicationId_1, ts, false); + delegationTokenRenewer.addApplication(applicationId_1, ts, false, false); + waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); - + waitForEventsToGetProcessed(delegationTokenRenewer); int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far try { Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew @@ -454,9 +503,8 @@ public class TestDelegationTokenRenewer { * @throws IOException * @throws URISyntaxException */ - @Test + @Test(timeout=60000) public void testDTKeepAlive1 () throws Exception { - DelegationTokenRenewer localDtr = new DelegationTokenRenewer(); Configuration lconf = new Configuration(conf); lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); //Keep tokens alive for 6 seconds. @@ -465,10 +513,15 @@ public class TestDelegationTokenRenewer { lconf.setLong( YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS, 1000l); + DelegationTokenRenewer localDtr = + createNewDelegationTokenRenewer(lconf, counter); localDtr.init(lconf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + when(mockContext.getDelegationTokenRenewer()).thenReturn( + localDtr); + when(mockContext.getDispatcher()).thenReturn(dispatcher); InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); @@ -487,16 +540,25 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplication(applicationId_0, ts, true); + localDtr.addApplication(applicationId_0, ts, true, false); + waitForEventsToGetProcessed(localDtr); + if (!eventQueue.isEmpty()){ + Event evt = eventQueue.take(); + if (evt instanceof RMAppEvent) { + Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START); + } else { + fail("RMAppEvent.START was expected!!"); + } + } + localDtr.applicationFinished(applicationId_0); - - Thread.sleep(3000l); + waitForEventsToGetProcessed(localDtr); //Token should still be around. Renewal should not fail. token1.renew(lconf); //Allow the keepalive time to run out - Thread.sleep(6000l); + Thread.sleep(10000l); //The token should have been cancelled at this point. Renewal will fail. try { @@ -518,9 +580,8 @@ public class TestDelegationTokenRenewer { * @throws IOException * @throws URISyntaxException */ - @Test + @Test(timeout=60000) public void testDTKeepAlive2() throws Exception { - DelegationTokenRenewer localDtr = new DelegationTokenRenewer(); Configuration lconf = new Configuration(conf); lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); //Keep tokens alive for 6 seconds. @@ -529,10 +590,15 @@ public class TestDelegationTokenRenewer { lconf.setLong( YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS, 1000l); + DelegationTokenRenewer localDtr = + createNewDelegationTokenRenewer(conf, counter); localDtr.init(lconf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + when(mockContext.getDelegationTokenRenewer()).thenReturn( + localDtr); + when(mockContext.getDispatcher()).thenReturn(dispatcher); InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); @@ -551,22 +617,18 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplication(applicationId_0, ts, true); + localDtr.addApplication(applicationId_0, ts, true, false); localDtr.applicationFinished(applicationId_0); - - Thread.sleep(4000l); - + waitForEventsToGetProcessed(delegationTokenRenewer); //Send another keep alive. localDtr.updateKeepAliveApplications(Collections .singletonList(applicationId_0)); //Renewal should not fail. token1.renew(lconf); - //Token should be around after this. Thread.sleep(4500l); //Renewal should not fail. - ~1.5 seconds for keepalive timeout. token1.renew(lconf); - //Allow the keepalive time to run out Thread.sleep(3000l); //The token should have been cancelled at this point. Renewal will fail. @@ -575,61 +637,127 @@ public class TestDelegationTokenRenewer { fail("Renewal of cancelled token should have failed"); } catch (InvalidToken ite) {} } - - @Test(timeout=20000) - public void testConncurrentAddApplication() - throws IOException, InterruptedException, BrokenBarrierException { - final CyclicBarrier startBarrier = new CyclicBarrier(2); - final CyclicBarrier endBarrier = new CyclicBarrier(2); - // this token uses barriers to block during renew - final Credentials creds1 = new Credentials(); - final Token token1 = mock(Token.class); - creds1.addToken(new Text("token"), token1); - doReturn(true).when(token1).isManaged(); - doAnswer(new Answer() { - public Long answer(InvocationOnMock invocation) - throws InterruptedException, BrokenBarrierException { - startBarrier.await(); - endBarrier.await(); - return Long.MAX_VALUE; - }}).when(token1).renew(any(Configuration.class)); + private DelegationTokenRenewer createNewDelegationTokenRenewer( + Configuration conf, final AtomicInteger counter) { + return new DelegationTokenRenewer() { - // this dummy token fakes renewing - final Credentials creds2 = new Credentials(); - final Token token2 = mock(Token.class); - creds2.addToken(new Text("token"), token2); - doReturn(true).when(token2).isManaged(); - doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class)); - - // fire up the renewer - final DelegationTokenRenewer dtr = new DelegationTokenRenewer(); - dtr.init(conf); - RMContext mockContext = mock(RMContext.class); - ClientRMService mockClientRMService = mock(ClientRMService.class); - when(mockContext.getClientRMService()).thenReturn(mockClientRMService); - InetSocketAddress sockAddr = - InetSocketAddress.createUnresolved("localhost", 1234); - when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - dtr.setRMContext(mockContext); - dtr.start(); - - // submit a job that blocks during renewal - Thread submitThread = new Thread() { @Override - public void run() { - try { - dtr.addApplication(mock(ApplicationId.class), creds1, false); - } catch (IOException e) {} + protected ThreadPoolExecutor + createNewThreadPoolService(Configuration conf) { + ThreadPoolExecutor pool = + new ThreadPoolExecutor(5, 5, 3L, TimeUnit.SECONDS, + new LinkedBlockingQueue()) { + + @Override + protected void afterExecute(Runnable r, Throwable t) { + counter.decrementAndGet(); + super.afterExecute(r, t); + } + + @Override + public void execute(Runnable command) { + counter.incrementAndGet(); + super.execute(command); + } + }; + return pool; } }; - submitThread.start(); - + } + + private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr) + throws InterruptedException { + int wait = 40; + while (wait-- > 0 + && counter.get() > 0) { + Thread.sleep(200); + } + } + + @Test(timeout=20000) + public void testConcurrentAddApplication() + throws IOException, InterruptedException, BrokenBarrierException { + final CyclicBarrier startBarrier = new CyclicBarrier(2); + final CyclicBarrier endBarrier = new CyclicBarrier(2); + + // this token uses barriers to block during renew + final Credentials creds1 = new Credentials(); + final Token token1 = mock(Token.class); + creds1.addToken(new Text("token"), token1); + doReturn(true).when(token1).isManaged(); + doAnswer(new Answer() { + public Long answer(InvocationOnMock invocation) + throws InterruptedException, BrokenBarrierException { + startBarrier.await(); + endBarrier.await(); + return Long.MAX_VALUE; + }}).when(token1).renew(any(Configuration.class)); + + // this dummy token fakes renewing + final Credentials creds2 = new Credentials(); + final Token token2 = mock(Token.class); + creds2.addToken(new Text("token"), token2); + doReturn(true).when(token2).isManaged(); + doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class)); + + // fire up the renewer + final DelegationTokenRenewer dtr = + createNewDelegationTokenRenewer(conf, counter); + dtr.init(conf); + RMContext mockContext = mock(RMContext.class); + ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + InetSocketAddress sockAddr = + InetSocketAddress.createUnresolved("localhost", 1234); + when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + dtr.setRMContext(mockContext); + when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); + dtr.start(); + // submit a job that blocks during renewal + Thread submitThread = new Thread() { + @Override + public void run() { + dtr.addApplication(mock(ApplicationId.class), creds1, false, false); + } + }; + submitThread.start(); + // wait till 1st submit blocks, then submit another - startBarrier.await(); - dtr.addApplication(mock(ApplicationId.class), creds2, false); - // signal 1st to complete - endBarrier.await(); - submitThread.join(); + startBarrier.await(); + dtr.addApplication(mock(ApplicationId.class), creds2, false, false); + // signal 1st to complete + endBarrier.await(); + submitThread.join(); + } + + @Test(timeout=20000) + public void testAppSubmissionWithInvalidDelegationToken() throws Exception { + Configuration conf = new Configuration(); + conf.set( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + MockRM rm = new MockRM(conf); + ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes()); + ContainerLaunchContext amContainer = + ContainerLaunchContext.newInstance( + new HashMap(), new HashMap(), + new ArrayList(), new HashMap(), tokens, + new HashMap()); + ApplicationSubmissionContext appSubContext = + ApplicationSubmissionContext.newInstance( + ApplicationId.newInstance(1234121, 0), + "BOGUS", "default", Priority.UNDEFINED, amContainer, false, + true, 1, Resource.newInstance(1024, 1), "BOGUS"); + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(appSubContext); + try { + rm.getClientRMService().submitApplication(request); + fail("Error was excepted."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + "Bad header found in token storage")); + } } }