YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie). Contributed by Jian He

(cherry picked from commit 0402bada19)
This commit is contained in:
Jason Lowe 2014-12-18 23:28:18 +00:00
parent 947475c127
commit 173664d70f
4 changed files with 103 additions and 17 deletions

View File

@ -212,6 +212,9 @@ Release 2.7.0 - UNRELEASED
YARN-2944. InMemorySCMStore can not be instantiated with ReflectionUtils#newInstance.
(Chris Trezzo via kasha)
YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie)
(Jian He via jlowe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Service to renew application delegation tokens.
*/
@ -94,6 +93,9 @@ public class DelegationTokenRenewer extends AbstractService {
private ConcurrentMap<ApplicationId, Set<DelegationTokenToRenew>> appTokens =
new ConcurrentHashMap<ApplicationId, Set<DelegationTokenToRenew>>();
private ConcurrentMap<Token<?>, DelegationTokenToRenew> allTokens =
new ConcurrentHashMap<Token<?>, DelegationTokenToRenew>();
private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
new ConcurrentHashMap<ApplicationId, Long>();
@ -202,6 +204,7 @@ public class DelegationTokenRenewer extends AbstractService {
renewalTimer.cancel();
}
appTokens.clear();
allTokens.clear();
this.renewerService.shutdown();
dtCancelThread.interrupt();
try {
@ -230,7 +233,7 @@ public class DelegationTokenRenewer extends AbstractService {
public final Configuration conf;
public long expirationDate;
public TimerTask timerTask;
public final boolean shouldCancelAtEnd;
public volatile boolean shouldCancelAtEnd;
public long maxDate;
public String user;
@ -407,12 +410,25 @@ public class DelegationTokenRenewer extends AbstractService {
boolean hasHdfsToken = false;
for (Token<?> token : tokens) {
if (token.isManaged()) {
tokenList.add(new DelegationTokenToRenew(applicationId,
token, getConfig(), now, shouldCancelAtEnd, evt.getUser()));
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
LOG.info(applicationId + " found existing hdfs token " + token);
hasHdfsToken = true;
}
DelegationTokenToRenew dttr = allTokens.get(token);
if (dttr != null) {
// If any of the jobs sharing the same token doesn't want to cancel
// the token, we should not cancel the token.
if (!evt.shouldCancelAtEnd) {
dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd;
LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd
+ " for token " + dttr.token);
}
continue;
}
tokenList.add(new DelegationTokenToRenew(applicationId, token,
getConfig(), now, shouldCancelAtEnd, evt.getUser()));
}
}
@ -429,6 +445,7 @@ public class DelegationTokenRenewer extends AbstractService {
}
for (DelegationTokenToRenew dtr : tokenList) {
appTokens.get(applicationId).add(dtr);
allTokens.put(dtr.token, dtr);
setTimerForTokenRenewal(dtr);
}
}
@ -496,7 +513,6 @@ public class DelegationTokenRenewer extends AbstractService {
token.setTimerTask(tTask); // keep reference to the timer
renewalTimer.schedule(token.timerTask, new Date(renewIn));
LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
+ token.applicationId);
}
@ -559,6 +575,10 @@ public class DelegationTokenRenewer extends AbstractService {
private void requestNewHdfsDelegationToken(ApplicationId applicationId,
String user, boolean shouldCancelAtEnd) throws IOException,
InterruptedException {
if (!hasProxyUserPrivileges) {
LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens.");
return;
}
// Get new hdfs tokens for this user
Credentials credentials = new Credentials();
Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
@ -621,6 +641,8 @@ public class DelegationTokenRenewer extends AbstractService {
LOG.error("removing failed delegation token for appid=" + applicationId
+ ";t=" + t.token.getService());
appTokens.get(applicationId).remove(t);
allTokens.remove(t.token);
// cancel the timer
if (t.timerTask != null) {
t.timerTask.cancel();
@ -685,9 +707,14 @@ public class DelegationTokenRenewer extends AbstractService {
cancelToken(dttr);
it.remove();
allTokens.remove(dttr.token);
}
}
}
if(tokens != null && tokens.isEmpty()) {
appTokens.remove(applicationId);
}
}
/**
@ -842,4 +869,9 @@ public class DelegationTokenRenewer extends AbstractService {
return appId;
}
}
// only for testing
protected ConcurrentMap<Token<?>, DelegationTokenToRenew> getAllTokens() {
return allTokens;
}
}

View File

@ -313,7 +313,7 @@ public class MockRM extends ResourceManager {
boolean waitForAccepted, boolean keepContainers) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
false, null, 0, null);
false, null, 0, null, true);
}
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
@ -322,7 +322,7 @@ public class MockRM extends ResourceManager {
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, attemptFailuresValidityInterval, null);
false, null, attemptFailuresValidityInterval, null, true);
}
public RMApp submitApp(int masterMemory, String name, String user,
@ -332,26 +332,24 @@ public class MockRM extends ResourceManager {
ApplicationId applicationId) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, 0, null);
isAppIdProvided, applicationId, 0, null, true);
}
@SuppressWarnings("deprecation")
public RMApp submitApp(int masterMemory,
LogAggregationContext logAggregationContext) throws Exception {
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, 0, logAggregationContext);
false, null, 0, logAggregationContext, true);
}
@SuppressWarnings("deprecation")
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext)
LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
@ -392,6 +390,7 @@ public class MockRM extends ResourceManager {
if (logAggregationContext != null) {
sub.setLogAggregationContext(logAggregationContext);
}
sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityM
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
@ -116,11 +118,12 @@ public class TestDelegationTokenRenewer {
private static int counter = 0;
private static Token<?> lastRenewed = null;
private static Token<?> tokenToRenewIn2Sec = null;
private static boolean cancelled = false;
private static void reset() {
counter = 0;
lastRenewed = null;
tokenToRenewIn2Sec = null;
}
@Override
@ -136,7 +139,8 @@ public class TestDelegationTokenRenewer {
@Override
public long renew(Token<?> t, Configuration conf) throws IOException {
if ( !(t instanceof MyToken)) {
return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
// renew in 3 seconds
return System.currentTimeMillis() + 3000;
}
MyToken token = (MyToken)t;
if(token.isCanceled()) {
@ -158,10 +162,13 @@ public class TestDelegationTokenRenewer {
@Override
public void cancel(Token<?> t, Configuration conf) {
cancelled = true;
if (t instanceof MyToken) {
MyToken token = (MyToken) t;
LOG.info("Cancel token " + token);
token.cancelToken();
}
}
}
@ -921,6 +928,7 @@ public class TestDelegationTokenRenewer {
// YARN will get the token for the app submitted without the delegation token.
@Test
public void testAppSubmissionWithoutDelegationToken() throws Exception {
conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
// create token2
Text userText2 = new Text("user2");
DelegationTokenIdentifier dtId2 =
@ -970,4 +978,48 @@ public class TestDelegationTokenRenewer {
appCredentials.readTokenStorageStream(buf);
Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
}
// Test submitting an application with the token obtained by a previously
// submitted application.
@Test (timeout = 30000)
public void testAppSubmissionWithPreviousToken() throws Exception{
MockRM rm = new TestSecurityMockRM(conf, null);
rm.start();
final MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm1.registerNode();
// create Token1:
Text userText1 = new Text("user");
DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(userText1, new Text("renewer1"),
userText1);
final Token<DelegationTokenIdentifier> token1 =
new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
"password1".getBytes(), dtId1.getKind(), new Text("service1"));
Credentials credentials = new Credentials();
credentials.addToken(userText1, token1);
// submit app1 with a token, set cancelTokenWhenComplete to false;
RMApp app1 =
rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
null, true, false, false, null, 0, null, false);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
// submit app2 with the same token, set cancelTokenWhenComplete to true;
RMApp app2 =
rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
null, true, false, false, null, 0, null, true);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
Assert.assertTrue(rm.getRMContext().getDelegationTokenRenewer()
.getAllTokens().containsKey(token1));
MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1);
// app2 completes, app1 is still running, check the token is not cancelled
Assert.assertFalse(Renewer.cancelled);
}
}