YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token renewal of applications part of a bigger workflow. Contributed by Daryn Sharp.

(cherry picked from commit 9c5911294e)
This commit is contained in:
Vinod Kumar Vavilapalli 2015-04-09 13:08:53 -07:00
parent 8dac245920
commit 1ff3fd33ed
3 changed files with 173 additions and 54 deletions

View File

@ -867,6 +867,9 @@ Release 2.7.0 - UNRELEASED
YARN-3466. Fix RM nodes web page to sort by node HTTP-address, #containers YARN-3466. Fix RM nodes web page to sort by node HTTP-address, #containers
and node-label column (Jason Lowe via wangda) and node-label column (Jason Lowe via wangda)
YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token
renewal of applications part of a bigger workflow. (Daryn Sharp via vinodkv)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
@ -229,15 +230,16 @@ public class DelegationTokenRenewer extends AbstractService {
@VisibleForTesting @VisibleForTesting
protected static class DelegationTokenToRenew { protected static class DelegationTokenToRenew {
public final Token<?> token; public final Token<?> token;
public final ApplicationId applicationId; public final Collection<ApplicationId> referringAppIds;
public final Configuration conf; public final Configuration conf;
public long expirationDate; public long expirationDate;
public TimerTask timerTask; public RenewalTimerTask timerTask;
public volatile boolean shouldCancelAtEnd; public volatile boolean shouldCancelAtEnd;
public long maxDate; public long maxDate;
public String user; public String user;
public DelegationTokenToRenew(ApplicationId jId, Token<?> token, public DelegationTokenToRenew(Collection<ApplicationId> applicationIds,
Token<?> token,
Configuration conf, long expirationDate, boolean shouldCancelAtEnd, Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
String user) { String user) {
this.token = token; this.token = token;
@ -251,20 +253,33 @@ public class DelegationTokenRenewer extends AbstractService {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
} }
} }
this.applicationId = jId; this.referringAppIds = Collections.synchronizedSet(
new HashSet<ApplicationId>(applicationIds));
this.conf = conf; this.conf = conf;
this.expirationDate = expirationDate; this.expirationDate = expirationDate;
this.timerTask = null; this.timerTask = null;
this.shouldCancelAtEnd = shouldCancelAtEnd; this.shouldCancelAtEnd = shouldCancelAtEnd;
} }
public void setTimerTask(TimerTask tTask) { public void setTimerTask(RenewalTimerTask tTask) {
timerTask = tTask; timerTask = tTask;
} }
@VisibleForTesting
public void cancelTimer() {
if (timerTask != null) {
timerTask.cancel();
}
}
@VisibleForTesting
public boolean isTimerCancelled() {
return (timerTask != null) && timerTask.cancelled.get();
}
@Override @Override
public String toString() { public String toString() {
return token + ";exp=" + expirationDate; return token + ";exp=" + expirationDate + "; apps=" + referringAppIds;
} }
@Override @Override
@ -415,19 +430,16 @@ public class DelegationTokenRenewer extends AbstractService {
} }
DelegationTokenToRenew dttr = allTokens.get(token); DelegationTokenToRenew dttr = allTokens.get(token);
if (dttr != null) { if (dttr == null) {
// If any of the jobs sharing the same token doesn't want to cancel dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
// the token, we should not cancel the token. getConfig(), now, shouldCancelAtEnd, evt.getUser());
if (!evt.shouldCancelAtEnd) { try {
dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd; renewToken(dttr);
LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd } catch (IOException ioe) {
+ " for token " + dttr.token); throw new IOException("Failed to renew token: " + dttr.token, ioe);
} }
continue;
} }
tokenList.add(dttr);
tokenList.add(new DelegationTokenToRenew(applicationId, token,
getConfig(), now, shouldCancelAtEnd, evt.getUser()));
} }
} }
@ -436,21 +448,21 @@ public class DelegationTokenRenewer extends AbstractService {
// If user provides incorrect token then it should not be added for // If user provides incorrect token then it should not be added for
// renewal. // renewal.
for (DelegationTokenToRenew dtr : tokenList) { for (DelegationTokenToRenew dtr : tokenList) {
try { DelegationTokenToRenew currentDtr =
renewToken(dtr); allTokens.putIfAbsent(dtr.token, dtr);
} catch (IOException ioe) { if (currentDtr != null) {
throw new IOException("Failed to renew token: " + dtr.token, ioe); // another job beat us
currentDtr.referringAppIds.add(applicationId);
appTokens.get(applicationId).add(currentDtr);
} else {
appTokens.get(applicationId).add(dtr);
setTimerForTokenRenewal(dtr);
} }
} }
for (DelegationTokenToRenew dtr : tokenList) {
appTokens.get(applicationId).add(dtr);
allTokens.put(dtr.token, dtr);
setTimerForTokenRenewal(dtr);
}
} }
if (!hasHdfsToken) { if (!hasHdfsToken) {
requestNewHdfsDelegationToken(applicationId, evt.getUser(), requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),
shouldCancelAtEnd); shouldCancelAtEnd);
} }
} }
@ -478,7 +490,7 @@ public class DelegationTokenRenewer extends AbstractService {
try { try {
requestNewHdfsDelegationTokenIfNeeded(dttr); requestNewHdfsDelegationTokenIfNeeded(dttr);
// if the token is not replaced by a new token, renew the token // if the token is not replaced by a new token, renew the token
if (appTokens.get(dttr.applicationId).contains(dttr)) { if (!dttr.isTimerCancelled()) {
renewToken(dttr); renewToken(dttr);
setTimerForTokenRenewal(dttr);// set the next one setTimerForTokenRenewal(dttr);// set the next one
} else { } else {
@ -508,12 +520,12 @@ public class DelegationTokenRenewer extends AbstractService {
long expiresIn = token.expirationDate - System.currentTimeMillis(); long expiresIn = token.expirationDate - System.currentTimeMillis();
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
// need to create new task every time // need to create new task every time
TimerTask tTask = new RenewalTimerTask(token); RenewalTimerTask tTask = new RenewalTimerTask(token);
token.setTimerTask(tTask); // keep reference to the timer token.setTimerTask(tTask); // keep reference to the timer
renewalTimer.schedule(token.timerTask, new Date(renewIn)); renewalTimer.schedule(token.timerTask, new Date(renewIn));
LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = " LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
+ token.applicationId); + token.referringAppIds);
} }
// renew a token // renew a token
@ -535,7 +547,7 @@ public class DelegationTokenRenewer extends AbstractService {
throw new IOException(e); throw new IOException(e);
} }
LOG.info("Renewed delegation-token= [" + dttr + "], for " LOG.info("Renewed delegation-token= [" + dttr + "], for "
+ dttr.applicationId); + dttr.referringAppIds);
} }
// Request new hdfs token if the token is about to expire, and remove the old // Request new hdfs token if the token is about to expire, and remove the old
@ -548,30 +560,37 @@ public class DelegationTokenRenewer extends AbstractService {
&& dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
&& dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { && dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
final Collection<ApplicationId> applicationIds;
synchronized (dttr.referringAppIds) {
applicationIds = new HashSet<>(dttr.referringAppIds);
dttr.referringAppIds.clear();
}
// remove all old expiring hdfs tokens for this application. // remove all old expiring hdfs tokens for this application.
Set<DelegationTokenToRenew> tokenSet = appTokens.get(dttr.applicationId); for (ApplicationId appId : applicationIds) {
if (tokenSet != null && !tokenSet.isEmpty()) { Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId);
if (tokenSet == null || tokenSet.isEmpty()) {
continue;
}
Iterator<DelegationTokenToRenew> iter = tokenSet.iterator(); Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
synchronized (tokenSet) { synchronized (tokenSet) {
while (iter.hasNext()) { while (iter.hasNext()) {
DelegationTokenToRenew t = iter.next(); DelegationTokenToRenew t = iter.next();
if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
iter.remove(); iter.remove();
if (t.timerTask != null) { t.cancelTimer();
t.timerTask.cancel();
}
LOG.info("Removed expiring token " + t); LOG.info("Removed expiring token " + t);
} }
} }
} }
} }
LOG.info("Token= (" + dttr + ") is expiring, request new token."); LOG.info("Token= (" + dttr + ") is expiring, request new token.");
requestNewHdfsDelegationToken(dttr.applicationId, dttr.user, requestNewHdfsDelegationToken(applicationIds, dttr.user,
dttr.shouldCancelAtEnd); dttr.shouldCancelAtEnd);
} }
} }
private void requestNewHdfsDelegationToken(ApplicationId applicationId, private void requestNewHdfsDelegationToken(
Collection<ApplicationId> referringAppIds,
String user, boolean shouldCancelAtEnd) throws IOException, String user, boolean shouldCancelAtEnd) throws IOException,
InterruptedException { InterruptedException {
if (!hasProxyUserPrivileges) { if (!hasProxyUserPrivileges) {
@ -583,18 +602,20 @@ public class DelegationTokenRenewer extends AbstractService {
Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials); Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
// Add new tokens to the toRenew list. // Add new tokens to the toRenew list.
LOG.info("Received new tokens for " + applicationId + ". Received " LOG.info("Received new tokens for " + referringAppIds + ". Received "
+ newTokens.length + " tokens."); + newTokens.length + " tokens.");
if (newTokens.length > 0) { if (newTokens.length > 0) {
for (Token<?> token : newTokens) { for (Token<?> token : newTokens) {
if (token.isManaged()) { if (token.isManaged()) {
DelegationTokenToRenew tokenToRenew = DelegationTokenToRenew tokenToRenew =
new DelegationTokenToRenew(applicationId, token, getConfig(), new DelegationTokenToRenew(referringAppIds, token, getConfig(),
Time.now(), shouldCancelAtEnd, user); Time.now(), shouldCancelAtEnd, user);
// renew the token to get the next expiration date. // renew the token to get the next expiration date.
renewToken(tokenToRenew); renewToken(tokenToRenew);
setTimerForTokenRenewal(tokenToRenew); setTimerForTokenRenewal(tokenToRenew);
appTokens.get(applicationId).add(tokenToRenew); for (ApplicationId applicationId : referringAppIds) {
appTokens.get(applicationId).add(tokenToRenew);
}
LOG.info("Received new token " + token); LOG.info("Received new token " + token);
} }
} }
@ -602,7 +623,9 @@ public class DelegationTokenRenewer extends AbstractService {
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob); credentials.writeTokenStorageToStream(dob);
ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); for (ApplicationId applicationId : referringAppIds) {
rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
}
} }
@VisibleForTesting @VisibleForTesting
@ -644,16 +667,18 @@ public class DelegationTokenRenewer extends AbstractService {
* removing failed DT * removing failed DT
*/ */
private void removeFailedDelegationToken(DelegationTokenToRenew t) { private void removeFailedDelegationToken(DelegationTokenToRenew t) {
ApplicationId applicationId = t.applicationId; Collection<ApplicationId> applicationIds = t.referringAppIds;
LOG.error("removing failed delegation token for appid=" + applicationId synchronized (applicationIds) {
+ ";t=" + t.token.getService()); LOG.error("removing failed delegation token for appid=" + applicationIds
appTokens.get(applicationId).remove(t); + ";t=" + t.token.getService());
for (ApplicationId applicationId : applicationIds) {
appTokens.get(applicationId).remove(t);
}
}
allTokens.remove(t.token); allTokens.remove(t.token);
// cancel the timer // cancel the timer
if (t.timerTask != null) { t.cancelTimer();
t.timerTask.cancel();
}
} }
/** /**
@ -706,9 +731,15 @@ public class DelegationTokenRenewer extends AbstractService {
+ "; token=" + dttr.token.getService()); + "; token=" + dttr.token.getService());
} }
// continue if the app list isn't empty
synchronized(dttr.referringAppIds) {
dttr.referringAppIds.remove(applicationId);
if (!dttr.referringAppIds.isEmpty()) {
continue;
}
}
// cancel the timer // cancel the timer
if (dttr.timerTask != null) dttr.cancelTimer();
dttr.timerTask.cancel();
// cancel the token // cancel the token
cancelToken(dttr); cancelToken(dttr);

View File

@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.DelegationTokenToRenew;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -123,7 +124,7 @@ public class TestDelegationTokenRenewer {
counter = 0; counter = 0;
lastRenewed = null; lastRenewed = null;
tokenToRenewIn2Sec = null; tokenToRenewIn2Sec = null;
cancelled = false;
} }
@Override @Override
@ -1046,4 +1047,88 @@ public class TestDelegationTokenRenewer {
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials); delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
Assert.assertEquals(oldCounter, MyFS.getInstanceCounter()); Assert.assertEquals(oldCounter, MyFS.getInstanceCounter());
} }
// Test submitting an application with the token obtained by a previously
// submitted application that is set to be cancelled. Token should be
// renewed while all apps are running, and then cancelled when all apps
// complete
@Test (timeout = 30000)
public void testCancelWithMultipleAppSubmissions() throws Exception{
MockRM rm = new TestSecurityMockRM(conf, null);
rm.start();
final MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm1.registerNode();
//MyFS fs = (MyFS)FileSystem.get(conf);
//MyToken token1 = fs.getDelegationToken("user123");
// create Token1:
Text userText1 = new Text("user");
DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(userText1, new Text("renewer1"),
userText1);
final Token<DelegationTokenIdentifier> token1 =
new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
"password1".getBytes(), dtId1.getKind(), new Text("service1"));
Credentials credentials = new Credentials();
credentials.addToken(token1.getService(), token1);
DelegationTokenRenewer renewer =
rm.getRMContext().getDelegationTokenRenewer();
Assert.assertTrue(renewer.getAllTokens().isEmpty());
Assert.assertFalse(Renewer.cancelled);
RMApp app1 =
rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
null, true, false, false, null, 0, null, true);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
DelegationTokenToRenew dttr = renewer.getAllTokens().get(token1);
Assert.assertNotNull(dttr);
Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
RMApp app2 =
rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
null, true, false, false, null, 0, null, true);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId()));
Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId()));
Assert.assertFalse(Renewer.cancelled);
MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
// app2 completes, app1 is still running, check the token is not cancelled
Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
Assert.assertFalse(dttr.referringAppIds.contains(app2.getApplicationId()));
Assert.assertFalse(dttr.isTimerCancelled());
Assert.assertFalse(Renewer.cancelled);
RMApp app3 =
rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
null, true, false, false, null, 0, null, true);
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm1);
rm.waitForState(app3.getApplicationId(), RMAppState.RUNNING);
Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId()));
Assert.assertFalse(dttr.isTimerCancelled());
Assert.assertFalse(Renewer.cancelled);
MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1);
Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
Assert.assertFalse(dttr.referringAppIds.contains(app1.getApplicationId()));
Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId()));
Assert.assertFalse(dttr.isTimerCancelled());
Assert.assertFalse(Renewer.cancelled);
MockRM.finishAMAndVerifyAppState(app3, rm, nm1, am3);
Assert.assertFalse(renewer.getAllTokens().containsKey(token1));
Assert.assertTrue(dttr.referringAppIds.isEmpty());
Assert.assertTrue(dttr.isTimerCancelled());
Assert.assertTrue(Renewer.cancelled);
}
} }