YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie). Contributed by Jian He
(cherry picked from commit 0402bada1989258ecbfdc437cb339322a1f55a97) (cherry picked from commit 173664d70f0ed3b1852b6703d32e796778fb1c78) (cherry picked from commit 04e71db1ce9572ae0641234a02b7db5d174668fd)
This commit is contained in:
parent
1931fa5f4b
commit
570d52e53c
@ -45,6 +45,9 @@ Release 2.6.1 - UNRELEASED
|
||||
YARN-2917. Fixed potential deadlock when system.exit is called in AsyncDispatcher
|
||||
(Rohith Sharmaks via jianhe)
|
||||
|
||||
YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie)
|
||||
(Jian He via jlowe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -69,7 +69,6 @@
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* Service to renew application delegation tokens.
|
||||
*/
|
||||
@ -94,6 +93,9 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
private ConcurrentMap<ApplicationId, Set<DelegationTokenToRenew>> appTokens =
|
||||
new ConcurrentHashMap<ApplicationId, Set<DelegationTokenToRenew>>();
|
||||
|
||||
private ConcurrentMap<Token<?>, DelegationTokenToRenew> allTokens =
|
||||
new ConcurrentHashMap<Token<?>, DelegationTokenToRenew>();
|
||||
|
||||
private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
|
||||
new ConcurrentHashMap<ApplicationId, Long>();
|
||||
|
||||
@ -202,6 +204,7 @@ protected void serviceStop() {
|
||||
renewalTimer.cancel();
|
||||
}
|
||||
appTokens.clear();
|
||||
allTokens.clear();
|
||||
this.renewerService.shutdown();
|
||||
dtCancelThread.interrupt();
|
||||
try {
|
||||
@ -230,7 +233,7 @@ protected static class DelegationTokenToRenew {
|
||||
public final Configuration conf;
|
||||
public long expirationDate;
|
||||
public TimerTask timerTask;
|
||||
public final boolean shouldCancelAtEnd;
|
||||
public volatile boolean shouldCancelAtEnd;
|
||||
public long maxDate;
|
||||
public String user;
|
||||
|
||||
@ -407,12 +410,25 @@ private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
|
||||
boolean hasHdfsToken = false;
|
||||
for (Token<?> token : tokens) {
|
||||
if (token.isManaged()) {
|
||||
tokenList.add(new DelegationTokenToRenew(applicationId,
|
||||
token, getConfig(), now, shouldCancelAtEnd, evt.getUser()));
|
||||
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
|
||||
LOG.info(applicationId + " found existing hdfs token " + token);
|
||||
hasHdfsToken = true;
|
||||
}
|
||||
|
||||
DelegationTokenToRenew dttr = allTokens.get(token);
|
||||
if (dttr != null) {
|
||||
// If any of the jobs sharing the same token doesn't want to cancel
|
||||
// the token, we should not cancel the token.
|
||||
if (!evt.shouldCancelAtEnd) {
|
||||
dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd;
|
||||
LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd
|
||||
+ " for token " + dttr.token);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
tokenList.add(new DelegationTokenToRenew(applicationId, token,
|
||||
getConfig(), now, shouldCancelAtEnd, evt.getUser()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -429,6 +445,7 @@ private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
|
||||
}
|
||||
for (DelegationTokenToRenew dtr : tokenList) {
|
||||
appTokens.get(applicationId).add(dtr);
|
||||
allTokens.put(dtr.token, dtr);
|
||||
setTimerForTokenRenewal(dtr);
|
||||
}
|
||||
}
|
||||
@ -496,7 +513,6 @@ protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
|
||||
token.setTimerTask(tTask); // keep reference to the timer
|
||||
|
||||
renewalTimer.schedule(token.timerTask, new Date(renewIn));
|
||||
|
||||
LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
|
||||
+ token.applicationId);
|
||||
}
|
||||
@ -559,6 +575,10 @@ private void requestNewHdfsDelegationTokenIfNeeded(
|
||||
private void requestNewHdfsDelegationToken(ApplicationId applicationId,
|
||||
String user, boolean shouldCancelAtEnd) throws IOException,
|
||||
InterruptedException {
|
||||
if (!hasProxyUserPrivileges) {
|
||||
LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens.");
|
||||
return;
|
||||
}
|
||||
// Get new hdfs tokens for this user
|
||||
Credentials credentials = new Credentials();
|
||||
Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
|
||||
@ -621,6 +641,8 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) {
|
||||
LOG.error("removing failed delegation token for appid=" + applicationId
|
||||
+ ";t=" + t.token.getService());
|
||||
appTokens.get(applicationId).remove(t);
|
||||
allTokens.remove(t.token);
|
||||
|
||||
// cancel the timer
|
||||
if (t.timerTask != null) {
|
||||
t.timerTask.cancel();
|
||||
@ -685,9 +707,14 @@ private void removeApplicationFromRenewal(ApplicationId applicationId) {
|
||||
cancelToken(dttr);
|
||||
|
||||
it.remove();
|
||||
allTokens.remove(dttr.token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(tokens != null && tokens.isEmpty()) {
|
||||
appTokens.remove(applicationId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -842,4 +869,9 @@ public ApplicationId getApplicationId() {
|
||||
return appId;
|
||||
}
|
||||
}
|
||||
|
||||
// only for testing
|
||||
protected ConcurrentMap<Token<?>, DelegationTokenToRenew> getAllTokens() {
|
||||
return allTokens;
|
||||
}
|
||||
}
|
||||
|
@ -313,7 +313,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
|
||||
boolean waitForAccepted, boolean keepContainers) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
||||
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
||||
false, null, 0, null);
|
||||
false, null, 0, null, true);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
|
||||
@ -322,7 +322,7 @@ public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
|
||||
.getShortUserName(), null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
|
||||
false, null, attemptFailuresValidityInterval, null);
|
||||
false, null, attemptFailuresValidityInterval, null, true);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
@ -332,26 +332,24 @@ public RMApp submitApp(int masterMemory, String name, String user,
|
||||
ApplicationId applicationId) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
||||
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
||||
isAppIdProvided, applicationId, 0, null);
|
||||
isAppIdProvided, applicationId, 0, null, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public RMApp submitApp(int masterMemory,
|
||||
LogAggregationContext logAggregationContext) throws Exception {
|
||||
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
|
||||
.getShortUserName(), null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
|
||||
false, null, 0, logAggregationContext);
|
||||
false, null, 0, logAggregationContext, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||
int maxAppAttempts, Credentials ts, String appType,
|
||||
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
||||
ApplicationId applicationId, long attemptFailuresValidityInterval,
|
||||
LogAggregationContext logAggregationContext)
|
||||
LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete)
|
||||
throws Exception {
|
||||
ApplicationId appId = isAppIdProvided ? applicationId : null;
|
||||
ApplicationClientProtocol client = getClientRMService();
|
||||
@ -392,6 +390,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
|
||||
if (logAggregationContext != null) {
|
||||
sub.setLogAggregationContext(logAggregationContext);
|
||||
}
|
||||
sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
|
||||
req.setApplicationSubmissionContext(sub);
|
||||
UserGroupInformation fakeUser =
|
||||
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
|
||||
|
@ -79,6 +79,7 @@
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
@ -86,6 +87,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.After;
|
||||
@ -116,11 +118,12 @@ public static class Renewer extends TokenRenewer {
|
||||
private static int counter = 0;
|
||||
private static Token<?> lastRenewed = null;
|
||||
private static Token<?> tokenToRenewIn2Sec = null;
|
||||
|
||||
private static boolean cancelled = false;
|
||||
private static void reset() {
|
||||
counter = 0;
|
||||
lastRenewed = null;
|
||||
tokenToRenewIn2Sec = null;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -136,7 +139,8 @@ public boolean isManaged(Token<?> token) throws IOException {
|
||||
@Override
|
||||
public long renew(Token<?> t, Configuration conf) throws IOException {
|
||||
if ( !(t instanceof MyToken)) {
|
||||
return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
|
||||
// renew in 3 seconds
|
||||
return System.currentTimeMillis() + 3000;
|
||||
}
|
||||
MyToken token = (MyToken)t;
|
||||
if(token.isCanceled()) {
|
||||
@ -158,9 +162,12 @@ public long renew(Token<?> t, Configuration conf) throws IOException {
|
||||
|
||||
@Override
|
||||
public void cancel(Token<?> t, Configuration conf) {
|
||||
MyToken token = (MyToken)t;
|
||||
LOG.info("Cancel token " + token);
|
||||
token.cancelToken();
|
||||
cancelled = true;
|
||||
if (t instanceof MyToken) {
|
||||
MyToken token = (MyToken) t;
|
||||
LOG.info("Cancel token " + token);
|
||||
token.cancelToken();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -921,6 +928,7 @@ public Boolean get() {
|
||||
// YARN will get the token for the app submitted without the delegation token.
|
||||
@Test
|
||||
public void testAppSubmissionWithoutDelegationToken() throws Exception {
|
||||
conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
|
||||
// create token2
|
||||
Text userText2 = new Text("user2");
|
||||
DelegationTokenIdentifier dtId2 =
|
||||
@ -970,4 +978,48 @@ public Boolean get() {
|
||||
appCredentials.readTokenStorageStream(buf);
|
||||
Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
|
||||
}
|
||||
|
||||
// Test submitting an application with the token obtained by a previously
|
||||
// submitted application.
|
||||
@Test (timeout = 30000)
|
||||
public void testAppSubmissionWithPreviousToken() throws Exception{
|
||||
MockRM rm = new TestSecurityMockRM(conf, null);
|
||||
rm.start();
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// create Token1:
|
||||
Text userText1 = new Text("user");
|
||||
DelegationTokenIdentifier dtId1 =
|
||||
new DelegationTokenIdentifier(userText1, new Text("renewer1"),
|
||||
userText1);
|
||||
final Token<DelegationTokenIdentifier> token1 =
|
||||
new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
|
||||
"password1".getBytes(), dtId1.getKind(), new Text("service1"));
|
||||
|
||||
Credentials credentials = new Credentials();
|
||||
credentials.addToken(userText1, token1);
|
||||
|
||||
// submit app1 with a token, set cancelTokenWhenComplete to false;
|
||||
RMApp app1 =
|
||||
rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
|
||||
null, true, false, false, null, 0, null, false);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
// submit app2 with the same token, set cancelTokenWhenComplete to true;
|
||||
RMApp app2 =
|
||||
rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
|
||||
null, true, false, false, null, 0, null, true);
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
|
||||
rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
|
||||
MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
|
||||
Assert.assertTrue(rm.getRMContext().getDelegationTokenRenewer()
|
||||
.getAllTokens().containsKey(token1));
|
||||
|
||||
MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1);
|
||||
// app2 completes, app1 is still running, check the token is not cancelled
|
||||
Assert.assertFalse(Renewer.cancelled);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user