YARN-10348. Allow RM to always cancel tokens after app completes. Contributed by
Jim Brennan.
This commit is contained in:
parent
52f2303b5a
commit
09f1547697
|
@ -729,6 +729,9 @@ public class YarnConfiguration extends Configuration {
|
||||||
RM_PREFIX + "delegation-token.max-conf-size-bytes";
|
RM_PREFIX + "delegation-token.max-conf-size-bytes";
|
||||||
public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
|
public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
|
||||||
12800;
|
12800;
|
||||||
|
public static final String RM_DELEGATION_TOKEN_ALWAYS_CANCEL =
|
||||||
|
RM_PREFIX + "delegation-token.always-cancel";
|
||||||
|
public static final boolean DEFAULT_RM_DELEGATION_TOKEN_ALWAYS_CANCEL = false;
|
||||||
|
|
||||||
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
|
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
|
||||||
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
|
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
|
||||||
|
|
|
@ -791,6 +791,16 @@
|
||||||
<value>12800</value>
|
<value>12800</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>If true, ResourceManager will always try to cancel delegation
|
||||||
|
tokens after the application completes, even if the client sets
|
||||||
|
shouldCancelAtEnd false. References to delegation tokens are tracked,
|
||||||
|
so they will not be canceled until all sub-tasks are done using them.
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.delegation-token.always-cancel</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>If true, ResourceManager will have proxy-user privileges.
|
<description>If true, ResourceManager will have proxy-user privileges.
|
||||||
Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to
|
Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to
|
||||||
|
|
|
@ -110,6 +110,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
private volatile boolean isServiceStarted;
|
private volatile boolean isServiceStarted;
|
||||||
private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
|
private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
|
||||||
|
|
||||||
|
private boolean alwaysCancelDelegationTokens;
|
||||||
private boolean tokenKeepAliveEnabled;
|
private boolean tokenKeepAliveEnabled;
|
||||||
private boolean hasProxyUserPrivileges;
|
private boolean hasProxyUserPrivileges;
|
||||||
private long credentialsValidTimeRemaining;
|
private long credentialsValidTimeRemaining;
|
||||||
|
@ -126,6 +127,9 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
this.alwaysCancelDelegationTokens =
|
||||||
|
conf.getBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
|
||||||
|
YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_ALWAYS_CANCEL);
|
||||||
this.hasProxyUserPrivileges =
|
this.hasProxyUserPrivileges =
|
||||||
conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
|
conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
|
||||||
YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
|
YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
|
||||||
|
@ -239,7 +243,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected static class DelegationTokenToRenew {
|
protected class DelegationTokenToRenew {
|
||||||
public final Token<?> token;
|
public final Token<?> token;
|
||||||
public final Collection<ApplicationId> referringAppIds;
|
public final Collection<ApplicationId> referringAppIds;
|
||||||
public final Configuration conf;
|
public final Configuration conf;
|
||||||
|
@ -269,7 +273,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.expirationDate = expirationDate;
|
this.expirationDate = expirationDate;
|
||||||
this.timerTask = null;
|
this.timerTask = null;
|
||||||
this.shouldCancelAtEnd = shouldCancelAtEnd;
|
this.shouldCancelAtEnd = shouldCancelAtEnd | alwaysCancelDelegationTokens;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTimerTask(RenewalTimerTask tTask) {
|
public void setTimerTask(RenewalTimerTask tTask) {
|
||||||
|
|
|
@ -201,6 +201,8 @@ public class TestDelegationTokenRenewer {
|
||||||
counter = new AtomicInteger(0);
|
counter = new AtomicInteger(0);
|
||||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
"kerberos");
|
"kerberos");
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
|
||||||
|
false);
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
eventQueue = new LinkedBlockingQueue<Event>();
|
eventQueue = new LinkedBlockingQueue<Event>();
|
||||||
dispatcher = new AsyncDispatcher(eventQueue);
|
dispatcher = new AsyncDispatcher(eventQueue);
|
||||||
|
@ -556,6 +558,76 @@ public class TestDelegationTokenRenewer {
|
||||||
token1.renew(conf);
|
token1.renew(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Basic idea of the test:
|
||||||
|
* 1. Verify that YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL = true
|
||||||
|
* overrides shouldCancelAtEnd
|
||||||
|
* 2. register a token for 2 seconds with shouldCancelAtEnd = false
|
||||||
|
* 3. cancel it immediately
|
||||||
|
* 4. check that token was canceled
|
||||||
|
* @throws IOException
|
||||||
|
* @throws URISyntaxException
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testDTRenewalWithNoCancelAlwaysCancel() throws Exception {
|
||||||
|
Configuration lconf = new Configuration(conf);
|
||||||
|
lconf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
|
||||||
|
true);
|
||||||
|
|
||||||
|
DelegationTokenRenewer localDtr =
|
||||||
|
createNewDelegationTokenRenewer(lconf, counter);
|
||||||
|
RMContext mockContext = mock(RMContext.class);
|
||||||
|
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||||
|
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||||
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
|
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||||
|
when(mockContext.getDelegationTokenRenewer()).thenReturn(
|
||||||
|
localDtr);
|
||||||
|
when(mockContext.getDispatcher()).thenReturn(dispatcher);
|
||||||
|
InetSocketAddress sockAddr =
|
||||||
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
|
localDtr.setRMContext(mockContext);
|
||||||
|
localDtr.init(lconf);
|
||||||
|
localDtr.start();
|
||||||
|
|
||||||
|
MyFS dfs = (MyFS)FileSystem.get(lconf);
|
||||||
|
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
|
||||||
|
|
||||||
|
Credentials ts = new Credentials();
|
||||||
|
MyToken token1 = dfs.getDelegationToken("user1");
|
||||||
|
|
||||||
|
//to cause this one to be set for renew in 2 secs
|
||||||
|
Renewer.tokenToRenewIn2Sec = token1;
|
||||||
|
LOG.info("token="+token1+" should be renewed for 2 secs");
|
||||||
|
|
||||||
|
String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
|
||||||
|
ts.addToken(new Text(nn1), token1);
|
||||||
|
|
||||||
|
ApplicationId applicationId = BuilderUtils.newApplicationId(0, 1);
|
||||||
|
localDtr.addApplicationAsync(applicationId, ts, false, "user",
|
||||||
|
new Configuration());
|
||||||
|
waitForEventsToGetProcessed(localDtr);
|
||||||
|
localDtr.applicationFinished(applicationId);
|
||||||
|
waitForEventsToGetProcessed(localDtr);
|
||||||
|
|
||||||
|
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
|
||||||
|
try {
|
||||||
|
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
|
||||||
|
} catch (InterruptedException e) {}
|
||||||
|
LOG.info("Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed);
|
||||||
|
|
||||||
|
// counter and the token should still be the old ones
|
||||||
|
assertEquals("renew wasn't called as many times as expected",
|
||||||
|
numberOfExpectedRenewals, Renewer.counter);
|
||||||
|
|
||||||
|
// The token should have been cancelled at this point. Renewal will fail.
|
||||||
|
try {
|
||||||
|
token1.renew(lconf);
|
||||||
|
fail("Renewal of cancelled token should have failed");
|
||||||
|
} catch (InvalidToken ite) {}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Basic idea of the test:
|
* Basic idea of the test:
|
||||||
* 0. Setup token KEEP_ALIVE
|
* 0. Setup token KEEP_ALIVE
|
||||||
|
|
Loading…
Reference in New Issue