YARN-2770. Added functionality to renew/cancel TimeLineDelegationToken. Contributed by Zhijie Shen

(cherry picked from commit 1b4be91866)
This commit is contained in:
Jian He 2014-10-31 13:15:36 -07:00
parent 268af259b5
commit a87f827ae4
8 changed files with 329 additions and 40 deletions

View File

@ -78,10 +78,14 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
org.apache.hadoop.security.token.Token<AbstractDelegationTokenIdentifier>
delegationToken;
org.apache.hadoop.security.token.Token<AbstractDelegationTokenIdentifier>
public org.apache.hadoop.security.token.Token<AbstractDelegationTokenIdentifier>
getDelegationToken() {
return delegationToken;
}
public void setDelegationToken(
org.apache.hadoop.security.token.Token<AbstractDelegationTokenIdentifier> delegationToken) {
this.delegationToken = delegationToken;
}
}

View File

@ -396,6 +396,9 @@ Release 2.6.0 - UNRELEASED
YARN-2778. Moved node-lables' reports to the yarn nodes CLI from the admin
CLI. (Wangda Tan via vinodkv)
YARN-2770. Added functionality to renew/cancel TimeLineDelegationToken.
(Zhijie Shen via jianhe)
OPTIMIZATIONS
BUG FIXES

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@ -319,8 +320,14 @@ public class YarnClientImpl extends YarnClient {
@VisibleForTesting
org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier>
getTimelineDelegationToken() throws IOException, YarnException {
return timelineClient.getDelegationToken(
UserGroupInformation.getCurrentUser().getUserName());
// Parse the RM daemon user if it exists in the config
String rmPrincipal = getConfig().get(YarnConfiguration.RM_PRINCIPAL);
String renewer = null;
if (rmPrincipal != null && rmPrincipal.length() > 0) {
HadoopKerberosName renewerKrbName = new HadoopKerberosName(rmPrincipal);
renewer = renewerKrbName.getShortName();
}
return timelineClient.getDelegationToken(renewer);
}
@Private

View File

@ -102,4 +102,34 @@ public abstract class TimelineClient extends AbstractService {
public abstract Token<TimelineDelegationTokenIdentifier> getDelegationToken(
String renewer) throws IOException, YarnException;
/**
* <p>
* Renew a timeline delegation token.
* </p>
*
* @param timelineDT
* the delegation token to renew
* @return the new expiration time
* @throws IOException
* @throws YarnException
*/
@Public
public abstract long renewDelegationToken(
Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException;
/**
* <p>
* Cancel a timeline delegation token.
* </p>
*
* @param timelineDT
* the delegation token to cancel
* @throws IOException
* @throws YarnException
*/
@Public
public abstract void cancelDelegationToken(
Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException;
}

View File

@ -68,13 +68,13 @@ import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
@ -124,6 +124,7 @@ public class TimelineClientImpl extends TimelineClient {
@Private
@VisibleForTesting
static class TimelineClientConnectionRetry {
// maxRetries < 0 means keep trying
@Private
@VisibleForTesting
@ -344,8 +345,97 @@ public class TimelineClientImpl extends TimelineClient {
@Override
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException, YarnException {
boolean isProxyAccess =
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
== UserGroupInformation.AuthenticationMethod.PROXY;
final String doAsUser = isProxyAccess ?
UserGroupInformation.getCurrentUser().getShortUserName() : null;
PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> getDTAction =
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
@Override
public Token<TimelineDelegationTokenIdentifier> run()
throws Exception {
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
return (Token) authUrl.getDelegationToken(
resURI.toURL(), token, renewer, doAsUser);
}
};
return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
}
@SuppressWarnings("unchecked")
@Override
public long renewDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException {
boolean isProxyAccess =
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
== UserGroupInformation.AuthenticationMethod.PROXY;
final String doAsUser = isProxyAccess ?
UserGroupInformation.getCurrentUser().getShortUserName() : null;
PrivilegedExceptionAction<Long> renewDTAction =
new PrivilegedExceptionAction<Long>() {
@Override
public Long run()
throws Exception {
// If the timeline DT to renew is different than cached, replace it.
// Token to set every time for retry, because when exception happens,
// DelegationTokenAuthenticatedURL will reset it to null;
if (!timelineDT.equals(token.getDelegationToken())) {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
return authUrl
.renewDelegationToken(resURI.toURL(), token, doAsUser);
}
};
return (Long) operateDelegationToken(renewDTAction);
}
@SuppressWarnings("unchecked")
@Override
public void cancelDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException {
boolean isProxyAccess =
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
== UserGroupInformation.AuthenticationMethod.PROXY;
final String doAsUser = isProxyAccess ?
UserGroupInformation.getCurrentUser().getShortUserName() : null;
PrivilegedExceptionAction<Void> cancelDTAction =
new PrivilegedExceptionAction<Void>() {
@Override
public Void run()
throws Exception {
// If the timeline DT to cancel is different than cached, replace it.
// Token to set every time for retry, because when exception happens,
// DelegationTokenAuthenticatedURL will reset it to null;
if (!timelineDT.equals(token.getDelegationToken())) {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
authUrl.cancelDelegationToken(resURI.toURL(), token, doAsUser);
return null;
}
};
operateDelegationToken(cancelDTAction);
}
private Object operateDelegationToken(
final PrivilegedExceptionAction<?> action)
throws IOException, YarnException {
// Set up the retry operation
TimelineClientRetryOp tokenRetryOp = new TimelineClientRetryOp() {
@Override
public Object run() throws IOException {
// Try pass the request, if fail, keep retrying
@ -355,25 +445,15 @@ public class TimelineClientImpl extends TimelineClient {
UserGroupInformation callerUGI = isProxyAccess ?
UserGroupInformation.getCurrentUser().getRealUser()
: UserGroupInformation.getCurrentUser();
final String doAsUser = isProxyAccess ?
UserGroupInformation.getCurrentUser().getShortUserName() : null;
try {
return callerUGI.doAs(
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
@Override
public Token<TimelineDelegationTokenIdentifier> run() throws Exception {
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
return (Token) authUrl.getDelegationToken(
resURI.toURL(), token, renewer, doAsUser);
}
});
return callerUGI.doAs(action);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public boolean shouldRetryOn(Exception e) {
// Only retry on connection exceptions
@ -381,8 +461,7 @@ public class TimelineClientImpl extends TimelineClient {
}
};
return (Token<TimelineDelegationTokenIdentifier>)
connectionRetry.retryOn(tokenRetryOp);
return connectionRetry.retryOn(tokenRetryOp);
}
@Private

View File

@ -18,11 +18,17 @@
package org.apache.hadoop.yarn.security.client;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
@Public
@Unstable
@ -52,10 +58,50 @@ public class TimelineDelegationTokenIdentifier extends YARNDelegationTokenIdenti
}
@InterfaceAudience.Private
public static class Renewer extends Token.TrivialRenewer {
public static class Renewer extends TokenRenewer {
@Override
protected Text getKind() {
return KIND_NAME;
public boolean handleKind(Text kind) {
return KIND_NAME.equals(kind);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
@SuppressWarnings("unchecked")
@Override
public long renew(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
TimelineClient client = TimelineClient.createTimelineClient();
try {
client.init(conf);
client.start();
return client.renewDelegationToken(
(Token<TimelineDelegationTokenIdentifier>) token);
} catch (YarnException e) {
throw new IOException(e);
} finally {
client.stop();
}
}
@SuppressWarnings("unchecked")
@Override
public void cancel(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
TimelineClient client = TimelineClient.createTimelineClient();
try {
client.init(conf);
client.start();
client.cancelDelegationToken(
(Token<TimelineDelegationTokenIdentifier>) token);
} catch (YarnException e) {
throw new IOException(e);
} finally {
client.stop();
}
}
}

View File

@ -28,15 +28,19 @@ import static org.mockito.Mockito.when;
import java.net.ConnectException;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -197,7 +201,7 @@ public class TestTimelineClient {
try {
// This call should fail because there is no timeline server
client.putEntities(generateEntity());
Assert.fail("Exception expected!"
Assert.fail("Exception expected! "
+ "Timeline server should be off to run this test. ");
} catch (RuntimeException ce) {
Assert.assertTrue(
@ -210,7 +214,7 @@ public class TestTimelineClient {
}
@Test
public void testTokenRetry() throws Exception {
public void testDelegationTokenOperationsRetry() throws Exception {
int newMaxRetries = 5;
long newIntervalMs = 500;
YarnConfiguration conf = new YarnConfiguration();
@ -222,24 +226,67 @@ public class TestTimelineClient {
// use kerberos to bypass the issue in HADOOP-11215
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.setConfiguration(conf);
TimelineClientImpl client = createTimelineClient(conf);
TestTimlineDelegationTokenSecretManager dtManager =
new TestTimlineDelegationTokenSecretManager();
try {
// try getting a delegation token
client.getDelegationToken(
UserGroupInformation.getCurrentUser().getShortUserName());
Assert.fail("Exception expected!"
+ "Timeline server should be off to run this test. ");
} catch (RuntimeException ce) {
Assert.assertTrue(
"Handler exception for reason other than retry: " + ce.toString(), ce
.getMessage().contains("Connection retries limit exceeded"));
// we would expect this exception here, check if the client has retried
Assert.assertTrue("Retry filter didn't perform any retries! ",
client.connectionRetry.retried);
dtManager.startThreads();
Thread.sleep(3000);
try {
// try getting a delegation token
client.getDelegationToken(
UserGroupInformation.getCurrentUser().getShortUserName());
assertFail();
} catch (RuntimeException ce) {
assertException(client, ce);
}
try {
// try renew a delegation token
TimelineDelegationTokenIdentifier timelineDT =
new TimelineDelegationTokenIdentifier(
new Text("tester"), new Text("tester"), new Text("tester"));
client.renewDelegationToken(
new Token<TimelineDelegationTokenIdentifier>(timelineDT, dtManager));
assertFail();
} catch (RuntimeException ce) {
assertException(client, ce);
}
try {
// try cancel a delegation token
TimelineDelegationTokenIdentifier timelineDT =
new TimelineDelegationTokenIdentifier(
new Text("tester"), new Text("tester"), new Text("tester"));
client.cancelDelegationToken(
new Token<TimelineDelegationTokenIdentifier>(timelineDT, dtManager));
assertFail();
} catch (RuntimeException ce) {
assertException(client, ce);
}
} finally {
client.stop();
dtManager.stopThreads();
}
}
private static void assertFail() {
Assert.fail("Exception expected! "
+ "Timeline server should be off to run this test.");
}
private void assertException(TimelineClientImpl client, RuntimeException ce) {
Assert.assertTrue(
"Handler exception for reason other than retry: " + ce.toString(), ce
.getMessage().contains("Connection retries limit exceeded"));
// we would expect this exception here, check if the client has retried
Assert.assertTrue("Retry filter didn't perform any retries! ",
client.connectionRetry.retried);
}
private static ClientResponse mockEntityClientResponse(
TimelineClientImpl client, ClientResponse.Status status,
boolean hasError, boolean hasRuntimeError) {
@ -324,4 +371,17 @@ public class TestTimelineClient {
return client;
}
private static class TestTimlineDelegationTokenSecretManager extends
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
public TestTimlineDelegationTokenSecretManager() {
super(100000, 100000, 100000, 100000);
}
@Override
public TimelineDelegationTokenIdentifier createIdentifier() {
return new TimelineDelegationTokenIdentifier();
}
}
}

View File

@ -214,7 +214,7 @@ public class TestTimelineAuthenticationFilter {
}
@Test
public void testGetDelegationToken() throws Exception {
public void testDelegationTokenOperations() throws Exception {
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
@Override
public Void call() throws Exception {
@ -227,6 +227,23 @@ public class TestTimelineAuthenticationFilter {
Assert.assertNotNull(tDT);
Assert.assertEquals(new Text(HTTP_USER), tDT.getOwner());
// Renew token
long renewTime1 = client.renewDelegationToken(token);
Thread.sleep(100);
long renewTime2 = client.renewDelegationToken(token);
Assert.assertTrue(renewTime1 < renewTime2);
// Cancel token
client.cancelDelegationToken(token);
// Renew should not be successful because the token is canceled
try {
client.renewDelegationToken(token);
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(
"Renewal request for unknown token"));
}
// Let HTTP user to get the delegation token for FOO user
UserGroupInformation fooUgi = UserGroupInformation.createProxyUser(
FOO_USER, UserGroupInformation.getCurrentUser());
@ -245,6 +262,49 @@ public class TestTimelineAuthenticationFilter {
Assert.assertEquals(new Text(FOO_USER), tDT.getOwner());
Assert.assertEquals(new Text(HTTP_USER), tDT.getRealUser());
// Renew token
final Token<TimelineDelegationTokenIdentifier> tokenToRenew = token;
renewTime1 = fooUgi.doAs(
new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return client.renewDelegationToken(tokenToRenew);
}
});
renewTime2 = fooUgi.doAs(
new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return client.renewDelegationToken(tokenToRenew);
}
});
Assert.assertTrue(renewTime1 < renewTime2);
// Cancel token
fooUgi.doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
client.cancelDelegationToken(tokenToRenew);
return null;
}
});
// Renew should not be successful because the token is canceled
try {
fooUgi.doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
client.renewDelegationToken(tokenToRenew);
return null;
}
});
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(
"Renewal request for unknown token"));
}
// Let HTTP user to get the delegation token for BAR user
UserGroupInformation barUgi = UserGroupInformation.createProxyUser(
BAR_USER, UserGroupInformation.getCurrentUser());