YARN-1812. Fixed ResourceManager to synchrously renew tokens after recovery and thus recover app itself synchronously and avoid races with resyncing NodeManagers. Contributed by Jian He.

svn merge --ignore-ancestry -c 1576843 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1576844 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-12 18:26:53 +00:00
parent 8c4578e894
commit 15744c614b
12 changed files with 152 additions and 76 deletions

View File

@ -438,6 +438,10 @@ Release 2.4.0 - UNRELEASED
specify host/rack requests without off-switch request. (Wangda Tan via specify host/rack requests without off-switch request. (Wangda Tan via
acmurthy) acmurthy)
YARN-1812. Fixed ResourceManager to synchrously renew tokens after recovery
and thus recover app itself synchronously and avoid races with resyncing
NodeManagers. (Jian He via vinodkv)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -337,7 +337,7 @@ public SubmitApplicationResponse submitApplication(
try { try {
// call RMAppManager to submit application directly // call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext, rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user, false, null); System.currentTimeMillis(), user);
LOG.info("Application with id " + applicationId.getId() + LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user); " submitted by user " + user);

View File

@ -263,48 +263,75 @@ protected synchronized void checkAppNumCompletedLimit() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void submitApplication( protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime, ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovered, RMState state) throws YarnException { String user) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationId applicationId = submissionContext.getApplicationId();
RMAppImpl application = RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user); createAndPopulateNewRMApp(submissionContext, submitTime, user);
ApplicationId appId = submissionContext.getApplicationId();
if (isRecovered) {
recoverApplication(state, application);
RMAppState rmAppState =
state.getApplicationState().get(applicationId).getState();
if (isApplicationInFinalState(rmAppState)) {
// We are synchronously moving the application into final state so that
// momentarily client will not see this application in NEW state. Also
// for finished applications we will avoid renewing tokens.
application
.handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
return;
}
}
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = null; Credentials credentials = null;
try { try {
credentials = parseCredentials(submissionContext); credentials = parseCredentials(submissionContext);
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
credentials, submissionContext.getCancelTokensWhenComplete());
} catch (Exception e) { } catch (Exception e) {
LOG.warn( LOG.warn("Unable to parse credentials.", e);
"Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the // Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the // RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application // scheduler about the existence of the application
assert application.getState() == RMAppState.NEW; assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler()
new RMAppRejectedEvent(applicationId, e.getMessage())); .handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
throw RPCUtil.getRemoteException(e); throw RPCUtil.getRemoteException(e);
} }
this.rmContext.getDelegationTokenRenewer().addApplication(
applicationId, credentials,
submissionContext.getCancelTokensWhenComplete(), isRecovered);
} else { } else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, .handle(new RMAppEvent(applicationId, RMAppEventType.START));
isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START)); }
}
@SuppressWarnings("unchecked")
protected void
recoverApplication(ApplicationState appState, RMState rmState)
throws Exception {
ApplicationSubmissionContext appContext =
appState.getApplicationSubmissionContext();
ApplicationId appId = appState.getAppId();
// create and recover app.
RMAppImpl application =
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser());
application.recover(rmState);
if (isApplicationInFinalState(appState.getState())) {
// We are synchronously moving the application into final state so that
// momentarily client will not see this application in NEW state. Also
// for finished applications we will avoid renewing tokens.
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
return;
}
if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = null;
try {
credentials = parseCredentials(appContext);
// synchronously renew delegation token on recovery.
rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
credentials, appContext.getCancelTokensWhenComplete());
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
} catch (Exception e) {
LOG.warn("Unable to parse and renew delegation tokens.", e);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(appId, e.getMessage()));
throw e;
}
} else {
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
} }
} }
@ -363,16 +390,6 @@ private void validateResourceRequest(
} }
} }
private void recoverApplication(RMState state, RMAppImpl application)
throws YarnException {
try {
application.recover(state);
} catch (Exception e) {
LOG.error("Error recovering application", e);
throw new YarnException(e);
}
}
private boolean isApplicationInFinalState(RMAppState rmAppState) { private boolean isApplicationInFinalState(RMAppState rmAppState) {
if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
|| rmAppState == RMAppState.KILLED) { || rmAppState == RMAppState.KILLED) {
@ -403,8 +420,7 @@ public void recover(RMState state) throws Exception {
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState(); Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications"); LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) { for (ApplicationState appState : appStates.values()) {
submitApplication(appState.getApplicationSubmissionContext(), recoverApplication(appState, state);
appState.getSubmitTime(), appState.getUser(), true, state);
} }
} }

View File

@ -731,7 +731,9 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
* Therefore we should wait for it to finish. * Therefore we should wait for it to finish.
*/ */
for (RMAppAttempt attempt : app.getAppAttempts().values()) { for (RMAppAttempt attempt : app.getAppAttempts().values()) {
app.dispatcher.getEventHandler().handle( // synchronously recover attempt to ensure any incoming external events
// to be processed after the attempt processes the recover event.
attempt.handle(
new RMAppAttemptEvent(attempt.getAppAttemptId(), new RMAppAttemptEvent(attempt.getAppAttemptId(),
RMAppAttemptEventType.RECOVER)); RMAppAttemptEventType.RECOVER));
} }

View File

@ -114,6 +114,7 @@ protected synchronized void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
renewerService = createNewThreadPoolService(conf); renewerService = createNewThreadPoolService(conf);
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>(); pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
renewalTimer = new Timer(true);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -136,7 +137,6 @@ protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
dtCancelThread.start(); dtCancelThread.start();
renewalTimer = new Timer(true);
if (tokenKeepAliveEnabled) { if (tokenKeepAliveEnabled) {
delayedRemovalThread = delayedRemovalThread =
new Thread(new DelayedTokenRemovalRunnable(getConfig()), new Thread(new DelayedTokenRemovalRunnable(getConfig()),
@ -151,12 +151,12 @@ protected void serviceStart() throws Exception {
isServiceStarted = true; isServiceStarted = true;
serviceStateLock.writeLock().unlock(); serviceStateLock.writeLock().unlock();
while(!pendingEventQueue.isEmpty()) { while(!pendingEventQueue.isEmpty()) {
processDelegationTokenRewewerEvent(pendingEventQueue.take()); processDelegationTokenRenewerEvent(pendingEventQueue.take());
} }
super.serviceStart(); super.serviceStart();
} }
private void processDelegationTokenRewewerEvent( private void processDelegationTokenRenewerEvent(
DelegationTokenRenewerEvent evt) { DelegationTokenRenewerEvent evt) {
serviceStateLock.readLock().lock(); serviceStateLock.readLock().lock();
try { try {
@ -325,19 +325,26 @@ public Set<Token<?>> getDelegationTokens() {
} }
/** /**
* Add application tokens for renewal. * Asynchronously add application tokens for renewal.
* @param applicationId added application * @param applicationId added application
* @param ts tokens * @param ts tokens
* @param shouldCancelAtEnd true if tokens should be canceled when the app is * @param shouldCancelAtEnd true if tokens should be canceled when the app is
* done else false. * done else false.
* @throws IOException * @throws IOException
*/ */
public void addApplication( public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd, boolean shouldCancelAtEnd) {
boolean isApplicationRecovered) { processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent( applicationId, ts, shouldCancelAtEnd));
applicationId, ts, }
shouldCancelAtEnd, isApplicationRecovered));
/**
* Synchronously renew delegation tokens.
*/
public void addApplicationSync(ApplicationId applicationId, Credentials ts,
boolean shouldCancelAtEnd) throws IOException{
handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
applicationId, ts, shouldCancelAtEnd));
} }
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
@ -493,7 +500,7 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) {
* @param applicationId completed application * @param applicationId completed application
*/ */
public void applicationFinished(ApplicationId applicationId) { public void applicationFinished(ApplicationId applicationId) {
processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent( processDelegationTokenRenewerEvent(new DelegationTokenRenewerEvent(
applicationId, applicationId,
DelegationTokenRenewerEventType.FINISH_APPLICATION)); DelegationTokenRenewerEventType.FINISH_APPLICATION));
} }
@ -638,9 +645,7 @@ private void handleDTRenewerAppSubmitEvent(
// Setup tokens for renewal // Setup tokens for renewal
DelegationTokenRenewer.this.handleAppSubmitEvent(event); DelegationTokenRenewer.this.handleAppSubmitEvent(event);
rmContext.getDispatcher().getEventHandler() rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(event.getApplicationId(), .handle(new RMAppEvent(event.getApplicationId(), RMAppEventType.START));
event.isApplicationRecovered() ? RMAppEventType.RECOVER
: RMAppEventType.START));
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn( LOG.warn(
"Unable to add the application to the delegation token renewer.", "Unable to add the application to the delegation token renewer.",
@ -654,20 +659,17 @@ private void handleDTRenewerAppSubmitEvent(
} }
} }
class DelegationTokenRenewerAppSubmitEvent extends private static class DelegationTokenRenewerAppSubmitEvent extends
DelegationTokenRenewerEvent { DelegationTokenRenewerEvent {
private Credentials credentials; private Credentials credentials;
private boolean shouldCancelAtEnd; private boolean shouldCancelAtEnd;
private boolean isAppRecovered;
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
Credentials credentails, boolean shouldCancelAtEnd, Credentials credentails, boolean shouldCancelAtEnd) {
boolean isApplicationRecovered) {
super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
this.credentials = credentails; this.credentials = credentails;
this.shouldCancelAtEnd = shouldCancelAtEnd; this.shouldCancelAtEnd = shouldCancelAtEnd;
this.isAppRecovered = isApplicationRecovered;
} }
public Credentials getCredentials() { public Credentials getCredentials() {
@ -677,10 +679,6 @@ public Credentials getCredentials() {
public boolean shouldCancelAtEnd() { public boolean shouldCancelAtEnd() {
return shouldCancelAtEnd; return shouldCancelAtEnd;
} }
public boolean isApplicationRecovered() {
return isAppRecovered;
}
} }
enum DelegationTokenRenewerEventType { enum DelegationTokenRenewerEventType {
@ -688,7 +686,7 @@ enum DelegationTokenRenewerEventType {
FINISH_APPLICATION FINISH_APPLICATION
} }
class DelegationTokenRenewerEvent extends private static class DelegationTokenRenewerEvent extends
AbstractEvent<DelegationTokenRenewerEventType> { AbstractEvent<DelegationTokenRenewerEventType> {
private ApplicationId appId; private ApplicationId appId;

View File

@ -497,7 +497,7 @@ protected void startWepApp() {
// override to disable webapp // override to disable webapp
} }
public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm,
MockAM am) throws Exception { MockAM am) throws Exception {
FinishApplicationMasterRequest req = FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance( FinishApplicationMasterRequest.newInstance(

View File

@ -30,7 +30,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -142,7 +141,7 @@ public MyRMAppManager(RMContext context, YarnScheduler scheduler,
@Override @Override
protected void submitApplication( protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime, ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovered, RMState state) throws YarnException { String user) throws YarnException {
//Do nothing, just add the application to RMContext //Do nothing, just add the application to RMContext
RMAppImpl application = RMAppImpl application =
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext, new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,

View File

@ -178,7 +178,7 @@ public void submitApplication(
ApplicationSubmissionContext submissionContext, String user) ApplicationSubmissionContext submissionContext, String user)
throws YarnException { throws YarnException {
super.submitApplication(submissionContext, System.currentTimeMillis(), super.submitApplication(submissionContext, System.currentTimeMillis(),
user, false, null); user);
} }
} }

View File

@ -414,7 +414,7 @@ public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode(); nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
MockRM.finishApplicationMaster(app1, rm1, nm1, am1); MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
// a failed app // a failed app
RMApp app2 = rm1.submitApp(200); RMApp app2 = rm1.submitApp(200);

View File

@ -1709,6 +1709,63 @@ public void testDecomissionedNMsMetricsOnRMRestart() throws Exception {
rm2.stop(); rm2.stop();
} }
// Test Delegation token is renewed synchronously so that recover events
// can be processed before any other external incoming events, specifically
// the ContainerFinished event on NM re-registraton.
@Test (timeout = 20000)
public void testSynchronouslyRenewDTOnRecovery() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// start RM
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
final MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app0 = rm1.submitApp(200);
final MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
MockRM rm2 = new MockRM(conf, memStore) {
@Override
protected ResourceTrackerService createResourceTrackerService() {
return new ResourceTrackerService(this.rmContext,
this.nodesListManager, this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
this.rmContext.getNMTokenSecretManager()) {
@Override
protected void serviceStart() throws Exception {
// send the container_finished event as soon as the
// ResourceTrackerService is started.
super.serviceStart();
nm1.setResourceTrackerService(getResourceTrackerService());
List<ContainerStatus> status = new ArrayList<ContainerStatus>();
ContainerId amContainer =
ContainerId.newInstance(am0.getApplicationAttemptId(), 1);
status.add(ContainerStatus.newInstance(amContainer,
ContainerState.COMPLETE, "AM container exit", 143));
nm1.registerNode(status);
}
};
}
};
// Re-start RM
rm2.start();
// wait for the 2nd attempt to be started.
RMApp loadedApp0 =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
int timeoutSecs = 0;
while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
Thread.sleep(200);
}
MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1);
MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1);
}
private void writeToHostsFile(String... hosts) throws IOException { private void writeToHostsFile(String... hosts) throws IOException {
if (!hostFile.exists()) { if (!hostFile.exists()) {
TEMP_DIR.mkdirs(); TEMP_DIR.mkdirs();

View File

@ -223,7 +223,7 @@ public void testAMRestartWithExistingContainers() throws Exception {
((CapacityScheduler) rm1.getResourceScheduler()) ((CapacityScheduler) rm1.getResourceScheduler())
.getCurrentAttemptForContainer(containerId2); .getCurrentAttemptForContainer(containerId2);
// finish this application // finish this application
MockRM.finishApplicationMaster(app1, rm1, nm1, am2); MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am2);
// the 2nd attempt released the 1st attempt's running container, when the // the 2nd attempt released the 1st attempt's running container, when the
// 2nd attempt finishes. // 2nd attempt finishes.

View File

@ -353,7 +353,7 @@ public void testDTRenewal () throws Exception {
// register the tokens for renewal // register the tokens for renewal
ApplicationId applicationId_0 = ApplicationId applicationId_0 =
BuilderUtils.newApplicationId(0, 0); BuilderUtils.newApplicationId(0, 0);
delegationTokenRenewer.addApplication(applicationId_0, ts, true, false); delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true);
waitForEventsToGetProcessed(delegationTokenRenewer); waitForEventsToGetProcessed(delegationTokenRenewer);
// first 3 initial renewals + 1 real // first 3 initial renewals + 1 real
@ -393,7 +393,7 @@ public void testDTRenewal () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, true, false); delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true);
waitForEventsToGetProcessed(delegationTokenRenewer); waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1); delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer); waitForEventsToGetProcessed(delegationTokenRenewer);
@ -429,7 +429,7 @@ public void testAppRejectionWithCancelledDelegationToken() throws Exception {
// register the tokens for renewal // register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0); ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
delegationTokenRenewer.addApplication(appId, ts, true, false); delegationTokenRenewer.addApplicationAsync(appId, ts, true);
int waitCnt = 20; int waitCnt = 20;
while (waitCnt-- >0) { while (waitCnt-- >0) {
if (!eventQueue.isEmpty()) { if (!eventQueue.isEmpty()) {
@ -473,7 +473,7 @@ public void testDTRenewalWithNoCancel () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, false, false); delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false);
waitForEventsToGetProcessed(delegationTokenRenewer); waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1); delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer); waitForEventsToGetProcessed(delegationTokenRenewer);
@ -540,7 +540,7 @@ public void testDTKeepAlive1 () throws Exception {
// register the tokens for renewal // register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplication(applicationId_0, ts, true, false); localDtr.addApplicationAsync(applicationId_0, ts, true);
waitForEventsToGetProcessed(localDtr); waitForEventsToGetProcessed(localDtr);
if (!eventQueue.isEmpty()){ if (!eventQueue.isEmpty()){
Event evt = eventQueue.take(); Event evt = eventQueue.take();
@ -617,7 +617,7 @@ public void testDTKeepAlive2() throws Exception {
// register the tokens for renewal // register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplication(applicationId_0, ts, true, false); localDtr.addApplicationAsync(applicationId_0, ts, true);
localDtr.applicationFinished(applicationId_0); localDtr.applicationFinished(applicationId_0);
waitForEventsToGetProcessed(delegationTokenRenewer); waitForEventsToGetProcessed(delegationTokenRenewer);
//Send another keep alive. //Send another keep alive.
@ -718,14 +718,14 @@ public Long answer(InvocationOnMock invocation)
Thread submitThread = new Thread() { Thread submitThread = new Thread() {
@Override @Override
public void run() { public void run() {
dtr.addApplication(mock(ApplicationId.class), creds1, false, false); dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false);
} }
}; };
submitThread.start(); submitThread.start();
// wait till 1st submit blocks, then submit another // wait till 1st submit blocks, then submit another
startBarrier.await(); startBarrier.await();
dtr.addApplication(mock(ApplicationId.class), creds2, false, false); dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false);
// signal 1st to complete // signal 1st to complete
endBarrier.await(); endBarrier.await();
submitThread.join(); submitThread.join();