YARN-2563. Fixed YarnClient to call getTimeLineDelegationToken only if the Token is not present. Contributed by Zhijie Shen

This commit is contained in:
Jian He 2014-09-18 14:36:23 -07:00
parent a337f0e354
commit eb92cc67df
3 changed files with 66 additions and 37 deletions

View File

@ -394,6 +394,9 @@ Release 2.6.0 - UNRELEASED
YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping
Du via jlowe) Du via jlowe)
YARN-2563. Fixed YarnClient to call getTimeLineDelegationToken only if the
Token is not present. (Zhijie Shen via jianhe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -110,7 +110,8 @@ public class YarnClientImpl extends YarnClient {
private AHSClient historyClient; private AHSClient historyClient;
private boolean historyServiceEnabled; private boolean historyServiceEnabled;
protected TimelineClient timelineClient; protected TimelineClient timelineClient;
protected Text timelineService; @VisibleForTesting
Text timelineService;
protected boolean timelineServiceEnabled; protected boolean timelineServiceEnabled;
private static final String ROOT = "root"; private static final String ROOT = "root";
@ -272,12 +273,6 @@ public class YarnClientImpl extends YarnClient {
private void addTimelineDelegationToken( private void addTimelineDelegationToken(
ContainerLaunchContext clc) throws YarnException, IOException { ContainerLaunchContext clc) throws YarnException, IOException {
org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier> timelineDelegationToken =
timelineClient.getDelegationToken(
UserGroupInformation.getCurrentUser().getUserName());
if (timelineDelegationToken == null) {
return;
}
Credentials credentials = new Credentials(); Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer(); DataInputByteBuffer dibb = new DataInputByteBuffer();
ByteBuffer tokens = clc.getTokens(); ByteBuffer tokens = clc.getTokens();
@ -290,11 +285,15 @@ public class YarnClientImpl extends YarnClient {
// one more // one more
for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : credentials for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : credentials
.getAllTokens()) { .getAllTokens()) {
TokenIdentifier tokenIdentifier = token.decodeIdentifier(); if (token.getKind().equals(TimelineDelegationTokenIdentifier.KIND_NAME)) {
if (tokenIdentifier instanceof TimelineDelegationTokenIdentifier) {
return; return;
} }
} }
org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier>
timelineDelegationToken = getTimelineDelegationToken();
if (timelineDelegationToken == null) {
return;
}
credentials.addToken(timelineService, timelineDelegationToken); credentials.addToken(timelineService, timelineDelegationToken);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Add timline delegation token into credentials: " LOG.debug("Add timline delegation token into credentials: "
@ -306,6 +305,13 @@ public class YarnClientImpl extends YarnClient {
clc.setTokens(tokens); clc.setTokens(tokens);
} }
@VisibleForTesting
org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier>
getTimelineDelegationToken() throws IOException, YarnException {
return timelineClient.getDelegationToken(
UserGroupInformation.getCurrentUser().getUserName());
}
@Private @Private
@VisibleForTesting @VisibleForTesting
protected boolean isSecurityEnabled() { protected boolean isSecurityEnabled() {

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.client.api.impl;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -39,6 +41,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -745,10 +748,13 @@ public class TestYarnClient {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
TimelineDelegationTokenIdentifier timelineDT =
new TimelineDelegationTokenIdentifier();
final Token<TimelineDelegationTokenIdentifier> dToken = final Token<TimelineDelegationTokenIdentifier> dToken =
new Token<TimelineDelegationTokenIdentifier>(); new Token<TimelineDelegationTokenIdentifier>(
timelineDT.getBytes(), new byte[0], timelineDT.getKind(), new Text());
// crate a mock client // crate a mock client
YarnClientImpl client = new YarnClientImpl() { YarnClientImpl client = spy(new YarnClientImpl() {
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
if (getConfig().getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, if (getConfig().getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
@ -784,34 +790,48 @@ public class TestYarnClient {
public boolean isSecurityEnabled() { public boolean isSecurityEnabled() {
return true; return true;
} }
}; });
client.init(conf); client.init(conf);
client.start(); client.start();
ApplicationSubmissionContext context = try {
mock(ApplicationSubmissionContext.class); // when i == 0, timeline DT already exists, no need to get one more
ApplicationId applicationId = ApplicationId.newInstance(0, 1); // when i == 1, timeline DT doesn't exist, need to get one more
when(context.getApplicationId()).thenReturn(applicationId); for (int i = 0; i < 2; ++i) {
DataOutputBuffer dob = new DataOutputBuffer(); ApplicationSubmissionContext context =
Credentials credentials = new Credentials(); mock(ApplicationSubmissionContext.class);
credentials.writeTokenStorageToStream(dob); ApplicationId applicationId = ApplicationId.newInstance(0, i + 1);
ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); when(context.getApplicationId()).thenReturn(applicationId);
ContainerLaunchContext clc = ContainerLaunchContext.newInstance( DataOutputBuffer dob = new DataOutputBuffer();
null, null, null, null, tokens, null); Credentials credentials = new Credentials();
when(context.getAMContainerSpec()).thenReturn(clc); if (i == 0) {
client.submitApplication(context); credentials.addToken(client.timelineService, dToken);
// Check whether token is added or not }
credentials = new Credentials(); credentials.writeTokenStorageToStream(dob);
DataInputByteBuffer dibb = new DataInputByteBuffer(); ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
tokens = clc.getTokens(); ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
if (tokens != null) { null, null, null, null, tokens, null);
dibb.reset(tokens); when(context.getAMContainerSpec()).thenReturn(clc);
credentials.readTokenStorageStream(dibb); client.submitApplication(context);
tokens.rewind(); if (i == 0) {
// GetTimelineDelegationToken shouldn't be called
verify(client, never()).getTimelineDelegationToken();
}
// In either way, token should be there
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
tokens = clc.getTokens();
if (tokens != null) {
dibb.reset(tokens);
credentials.readTokenStorageStream(dibb);
tokens.rewind();
}
Collection<Token<? extends TokenIdentifier>> dTokens =
credentials.getAllTokens();
Assert.assertEquals(1, dTokens.size());
Assert.assertEquals(dToken, dTokens.iterator().next());
}
} finally {
client.stop();
} }
Collection<Token<? extends TokenIdentifier>> dTokens =
credentials.getAllTokens();
Assert.assertEquals(1, dTokens.size());
Assert.assertEquals(dToken, dTokens.iterator().next());
client.stop();
} }
} }