YARN-3287. Made TimelineClient put methods do as the correct login context. Contributed by Daryn Sharp and Jonathan Eagles.

(cherry picked from commit d6e05c5ee2)
This commit is contained in:
Zhijie Shen 2015-03-09 13:54:36 -07:00
parent bbaa1344a1
commit 6527be374b
3 changed files with 144 additions and 180 deletions

View File

@ -683,6 +683,9 @@ Release 2.7.0 - UNRELEASED
YARN-3275. CapacityScheduler: Preemption happening on non-preemptable YARN-3275. CapacityScheduler: Preemption happening on non-preemptable
queues (Eric Payne via jlowe) queues (Eric Payne via jlowe)
YARN-3287. Made TimelineClient put methods do as the correct login context.
(Daryn Sharp and Jonathan Eagles via zjshen)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -108,6 +109,8 @@ public class TimelineClientImpl extends TimelineClient {
private DelegationTokenAuthenticator authenticator; private DelegationTokenAuthenticator authenticator;
private DelegationTokenAuthenticatedURL.Token token; private DelegationTokenAuthenticatedURL.Token token;
private URI resURI; private URI resURI;
private UserGroupInformation authUgi;
private String doAsUser;
@Private @Private
@VisibleForTesting @VisibleForTesting
@ -252,6 +255,15 @@ public class TimelineClientImpl extends TimelineClient {
} }
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {
authUgi = realUgi;
doAsUser = ugi.getShortUserName();
} else {
authUgi = ugi;
doAsUser = null;
}
ClientConfig cc = new DefaultClientConfig(); ClientConfig cc = new DefaultClientConfig();
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
connConfigurator = newConnConfigurator(conf); connConfigurator = newConnConfigurator(conf);
@ -301,16 +313,20 @@ public class TimelineClientImpl extends TimelineClient {
doPosting(domain, "domain"); doPosting(domain, "domain");
} }
private ClientResponse doPosting(Object obj, String path) throws IOException, YarnException { private ClientResponse doPosting(final Object obj, final String path)
throws IOException, YarnException {
ClientResponse resp; ClientResponse resp;
try { try {
resp = doPostingObject(obj, path); resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
} catch (RuntimeException re) { @Override
// runtime exception is expected if the client cannot connect the server public ClientResponse run() throws Exception {
String msg = return doPostingObject(obj, path);
"Failed to get the response from the timeline server."; }
LOG.error(msg, re); });
throw re; } catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException ie) {
throw new IOException(ie);
} }
if (resp == null || if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) { resp.getClientResponseStatus() != ClientResponse.Status.OK) {
@ -331,11 +347,6 @@ public class TimelineClientImpl extends TimelineClient {
@Override @Override
public Token<TimelineDelegationTokenIdentifier> getDelegationToken( public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException, YarnException { final String renewer) throws IOException, YarnException {
boolean isProxyAccess =
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
== UserGroupInformation.AuthenticationMethod.PROXY;
final String doAsUser = isProxyAccess ?
UserGroupInformation.getCurrentUser().getShortUserName() : null;
PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> getDTAction = PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> getDTAction =
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() { new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
@ -357,11 +368,6 @@ public class TimelineClientImpl extends TimelineClient {
public long renewDelegationToken( public long renewDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT) final Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException { throws IOException, YarnException {
boolean isProxyAccess =
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
== UserGroupInformation.AuthenticationMethod.PROXY;
final String doAsUser = isProxyAccess ?
UserGroupInformation.getCurrentUser().getShortUserName() : null;
boolean useHttps = YarnConfiguration.useHttps(this.getConfig()); boolean useHttps = YarnConfiguration.useHttps(this.getConfig());
final String scheme = useHttps ? "https" : "http"; final String scheme = useHttps ? "https" : "http";
final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(timelineDT); final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(timelineDT);
@ -393,11 +399,6 @@ public class TimelineClientImpl extends TimelineClient {
public void cancelDelegationToken( public void cancelDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT) final Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException { throws IOException, YarnException {
boolean isProxyAccess =
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
== UserGroupInformation.AuthenticationMethod.PROXY;
final String doAsUser = isProxyAccess ?
UserGroupInformation.getCurrentUser().getShortUserName() : null;
boolean useHttps = YarnConfiguration.useHttps(this.getConfig()); boolean useHttps = YarnConfiguration.useHttps(this.getConfig());
final String scheme = useHttps ? "https" : "http"; final String scheme = useHttps ? "https" : "http";
final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(timelineDT); final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(timelineDT);
@ -433,15 +434,9 @@ public class TimelineClientImpl extends TimelineClient {
@Override @Override
public Object run() throws IOException { public Object run() throws IOException {
// Try pass the request, if fail, keep retrying // Try pass the request, if fail, keep retrying
boolean isProxyAccess = authUgi.checkTGTAndReloginFromKeytab();
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
== UserGroupInformation.AuthenticationMethod.PROXY;
UserGroupInformation callerUGI = isProxyAccess ?
UserGroupInformation.getCurrentUser().getRealUser()
: UserGroupInformation.getCurrentUser();
callerUGI.checkTGTAndReloginFromKeytab();
try { try {
return callerUGI.doAs(action); return authUgi.doAs(action);
} catch (UndeclaredThrowableException e) { } catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause()); throw new IOException(e.getCause());
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -481,28 +476,15 @@ public class TimelineClientImpl extends TimelineClient {
@Override @Override
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
boolean isProxyAccess = authUgi.checkTGTAndReloginFromKeytab();
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
== UserGroupInformation.AuthenticationMethod.PROXY;
UserGroupInformation callerUGI = isProxyAccess ?
UserGroupInformation.getCurrentUser().getRealUser()
: UserGroupInformation.getCurrentUser();
final String doAsUser = isProxyAccess ?
UserGroupInformation.getCurrentUser().getShortUserName() : null;
callerUGI.checkTGTAndReloginFromKeytab();
try { try {
return callerUGI.doAs(new PrivilegedExceptionAction<HttpURLConnection>() { return new DelegationTokenAuthenticatedURL(
@Override authenticator, connConfigurator).openConnection(url, token,
public HttpURLConnection run() throws Exception { doAsUser);
return new DelegationTokenAuthenticatedURL(
authenticator, connConfigurator).openConnection(url, token,
doAsUser);
}
});
} catch (UndeclaredThrowableException e) { } catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause()); throw new IOException(e.getCause());
} catch (InterruptedException e) { } catch (AuthenticationException ae) {
throw new IOException(e); throw new IOException(ae);
} }
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.KerberosTestUtils; import org.apache.hadoop.security.authentication.KerberosTestUtils;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
@ -47,9 +48,9 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.junit.After; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -77,20 +78,19 @@ public class TestTimelineAuthenticationFilter {
return Arrays.asList(new Object[][] { { false }, { true } }); return Arrays.asList(new Object[][] { { false }, { true } });
} }
private MiniKdc testMiniKDC; private static MiniKdc testMiniKDC;
private String keystoresDir; private static String keystoresDir;
private String sslConfDir; private static String sslConfDir;
private ApplicationHistoryServer testTimelineServer; private static ApplicationHistoryServer testTimelineServer;
private Configuration conf; private static Configuration conf;
private TimelineClient client; private static boolean withSsl;
private boolean withSsl;
public TestTimelineAuthenticationFilter(boolean withSsl) { public TestTimelineAuthenticationFilter(boolean withSsl) {
this.withSsl = withSsl; TestTimelineAuthenticationFilter.withSsl = withSsl;
} }
@Before @BeforeClass
public void setup() { public static void setup() {
try { try {
testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir); testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir);
testMiniKDC.start(); testMiniKDC.start();
@ -127,6 +127,7 @@ public class TestTimelineAuthenticationFilter {
"localhost:8190"); "localhost:8190");
conf.set("hadoop.proxyuser.HTTP.hosts", "*"); conf.set("hadoop.proxyuser.HTTP.hosts", "*");
conf.set("hadoop.proxyuser.HTTP.users", FOO_USER); conf.set("hadoop.proxyuser.HTTP.users", FOO_USER);
conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 1);
if (withSsl) { if (withSsl) {
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
@ -146,14 +147,17 @@ public class TestTimelineAuthenticationFilter {
} catch (Exception e) { } catch (Exception e) {
assertTrue("Couldn't setup TimelineServer", false); assertTrue("Couldn't setup TimelineServer", false);
} }
client = TimelineClient.createTimelineClient();
client.init(conf);
client.start();
} }
@After private TimelineClient createTimelineClientForUGI() {
public void tearDown() throws Exception { TimelineClient client = TimelineClient.createTimelineClient();
client.init(conf);
client.start();
return client;
}
@AfterClass
public static void tearDown() throws Exception {
if (testMiniKDC != null) { if (testMiniKDC != null) {
testMiniKDC.stop(); testMiniKDC.stop();
} }
@ -162,10 +166,6 @@ public class TestTimelineAuthenticationFilter {
testTimelineServer.stop(); testTimelineServer.stop();
} }
if (client != null) {
client.stop();
}
if (withSsl) { if (withSsl) {
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir); KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
File base = new File(BASEDIR); File base = new File(BASEDIR);
@ -178,6 +178,7 @@ public class TestTimelineAuthenticationFilter {
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() { KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
TimelineClient client = createTimelineClientForUGI();
TimelineEntity entityToStore = new TimelineEntity(); TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType( entityToStore.setEntityType(
TestTimelineAuthenticationFilter.class.getName()); TestTimelineAuthenticationFilter.class.getName());
@ -199,6 +200,7 @@ public class TestTimelineAuthenticationFilter {
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() { KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
TimelineClient client = createTimelineClientForUGI();
TimelineDomain domainToStore = new TimelineDomain(); TimelineDomain domainToStore = new TimelineDomain();
domainToStore.setId(TestTimelineAuthenticationFilter.class.getName()); domainToStore.setId(TestTimelineAuthenticationFilter.class.getName());
domainToStore.setReaders("*"); domainToStore.setReaders("*");
@ -215,119 +217,96 @@ public class TestTimelineAuthenticationFilter {
@Test @Test
public void testDelegationTokenOperations() throws Exception { public void testDelegationTokenOperations() throws Exception {
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() { TimelineClient httpUserClient =
@Override KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<TimelineClient>() {
public Void call() throws Exception { @Override
// Let HTTP user to get the delegation for itself public TimelineClient call() throws Exception {
Token<TimelineDelegationTokenIdentifier> token = return createTimelineClientForUGI();
client.getDelegationToken(
UserGroupInformation.getCurrentUser().getShortUserName());
Assert.assertNotNull(token);
TimelineDelegationTokenIdentifier tDT = token.decodeIdentifier();
Assert.assertNotNull(tDT);
Assert.assertEquals(new Text(HTTP_USER), tDT.getOwner());
// Renew token
long renewTime1 = client.renewDelegationToken(token);
Thread.sleep(100);
long renewTime2 = client.renewDelegationToken(token);
Assert.assertTrue(renewTime1 < renewTime2);
// Cancel token
client.cancelDelegationToken(token);
// Renew should not be successful because the token is canceled
try {
client.renewDelegationToken(token);
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(
"Renewal request for unknown token"));
} }
});
// Let HTTP user to get the delegation token for FOO user UserGroupInformation httpUser =
UserGroupInformation fooUgi = UserGroupInformation.createProxyUser( KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<UserGroupInformation>() {
FOO_USER, UserGroupInformation.getCurrentUser()); @Override
token = fooUgi.doAs( public UserGroupInformation call() throws Exception {
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() { return UserGroupInformation.getCurrentUser();
@Override
public Token<TimelineDelegationTokenIdentifier> run()
throws Exception {
return client.getDelegationToken(
UserGroupInformation.getCurrentUser().getShortUserName());
}
});
Assert.assertNotNull(token);
tDT = token.decodeIdentifier();
Assert.assertNotNull(tDT);
Assert.assertEquals(new Text(FOO_USER), tDT.getOwner());
Assert.assertEquals(new Text(HTTP_USER), tDT.getRealUser());
// Renew token
final Token<TimelineDelegationTokenIdentifier> tokenToRenew = token;
renewTime1 = fooUgi.doAs(
new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return client.renewDelegationToken(tokenToRenew);
}
});
renewTime2 = fooUgi.doAs(
new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return client.renewDelegationToken(tokenToRenew);
}
});
Assert.assertTrue(renewTime1 < renewTime2);
// Cancel token
fooUgi.doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
client.cancelDelegationToken(tokenToRenew);
return null;
}
});
// Renew should not be successful because the token is canceled
try {
fooUgi.doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
client.renewDelegationToken(tokenToRenew);
return null;
}
});
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(
"Renewal request for unknown token"));
} }
});
// Let HTTP user to get the delegation for itself
Token<TimelineDelegationTokenIdentifier> token =
httpUserClient.getDelegationToken(httpUser.getShortUserName());
Assert.assertNotNull(token);
TimelineDelegationTokenIdentifier tDT = token.decodeIdentifier();
Assert.assertNotNull(tDT);
Assert.assertEquals(new Text(HTTP_USER), tDT.getOwner());
// Let HTTP user to get the delegation token for BAR user // Renew token
UserGroupInformation barUgi = UserGroupInformation.createProxyUser( long renewTime1 = httpUserClient.renewDelegationToken(token);
BAR_USER, UserGroupInformation.getCurrentUser()); Thread.sleep(100);
token = barUgi.doAs( long renewTime2 = httpUserClient.renewDelegationToken(token);
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() { Assert.assertTrue(renewTime1 < renewTime2);
// Cancel token
httpUserClient.cancelDelegationToken(token);
// Renew should not be successful because the token is canceled
try {
httpUserClient.renewDelegationToken(token);
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(
"Renewal request for unknown token"));
}
// Let HTTP user to get the delegation token for FOO user
UserGroupInformation fooUgi = UserGroupInformation.createProxyUser(
FOO_USER, httpUser);
TimelineClient fooUserClient = fooUgi.doAs(
new PrivilegedExceptionAction<TimelineClient>() {
@Override @Override
public Token<TimelineDelegationTokenIdentifier> run() public TimelineClient run() throws Exception {
throws Exception { return createTimelineClientForUGI();
try {
Token<TimelineDelegationTokenIdentifier> token =
client.getDelegationToken(
UserGroupInformation.getCurrentUser().getShortUserName());
Assert.fail();
return token;
} catch (Exception e) {
Assert.assertTrue(e instanceof AuthorizationException);
return null;
}
} }
}); });
return null; token = fooUserClient.getDelegationToken(httpUser.getShortUserName());
} Assert.assertNotNull(token);
}); tDT = token.decodeIdentifier();
Assert.assertNotNull(tDT);
Assert.assertEquals(new Text(FOO_USER), tDT.getOwner());
Assert.assertEquals(new Text(HTTP_USER), tDT.getRealUser());
// Renew token as the renewer
final Token<TimelineDelegationTokenIdentifier> tokenToRenew = token;
renewTime1 = httpUserClient.renewDelegationToken(tokenToRenew);
renewTime2 = httpUserClient.renewDelegationToken(tokenToRenew);
Assert.assertTrue(renewTime1 < renewTime2);
// Cancel token
fooUserClient.cancelDelegationToken(tokenToRenew);
// Renew should not be successful because the token is canceled
try {
httpUserClient.renewDelegationToken(tokenToRenew);
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(
e.getMessage().contains("Renewal request for unknown token"));
}
// Let HTTP user to get the delegation token for BAR user
UserGroupInformation barUgi = UserGroupInformation.createProxyUser(
BAR_USER, httpUser);
TimelineClient barUserClient = barUgi.doAs(
new PrivilegedExceptionAction<TimelineClient>() {
@Override
public TimelineClient run() {
return createTimelineClientForUGI();
}
});
try {
barUserClient.getDelegationToken(httpUser.getShortUserName());
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof AuthorizationException || e.getCause() instanceof AuthenticationException);
}
} }
} }