YARN-2563. Fixed YarnClient to call getTimeLineDelegationToken only if the Token is not present. Contributed by Zhijie Shen
(cherry picked from commit eb92cc67df
)
This commit is contained in:
parent
d9273a9547
commit
4cabeef931
|
@ -364,6 +364,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping
|
YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping
|
||||||
Du via jlowe)
|
Du via jlowe)
|
||||||
|
|
||||||
|
YARN-2563. Fixed YarnClient to call getTimeLineDelegationToken only if the
|
||||||
|
Token is not present. (Zhijie Shen via jianhe)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -110,7 +110,8 @@ public class YarnClientImpl extends YarnClient {
|
||||||
private AHSClient historyClient;
|
private AHSClient historyClient;
|
||||||
private boolean historyServiceEnabled;
|
private boolean historyServiceEnabled;
|
||||||
protected TimelineClient timelineClient;
|
protected TimelineClient timelineClient;
|
||||||
protected Text timelineService;
|
@VisibleForTesting
|
||||||
|
Text timelineService;
|
||||||
protected boolean timelineServiceEnabled;
|
protected boolean timelineServiceEnabled;
|
||||||
|
|
||||||
private static final String ROOT = "root";
|
private static final String ROOT = "root";
|
||||||
|
@ -272,12 +273,6 @@ public class YarnClientImpl extends YarnClient {
|
||||||
|
|
||||||
private void addTimelineDelegationToken(
|
private void addTimelineDelegationToken(
|
||||||
ContainerLaunchContext clc) throws YarnException, IOException {
|
ContainerLaunchContext clc) throws YarnException, IOException {
|
||||||
org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier> timelineDelegationToken =
|
|
||||||
timelineClient.getDelegationToken(
|
|
||||||
UserGroupInformation.getCurrentUser().getUserName());
|
|
||||||
if (timelineDelegationToken == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = new Credentials();
|
||||||
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
||||||
ByteBuffer tokens = clc.getTokens();
|
ByteBuffer tokens = clc.getTokens();
|
||||||
|
@ -290,11 +285,15 @@ public class YarnClientImpl extends YarnClient {
|
||||||
// one more
|
// one more
|
||||||
for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : credentials
|
for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : credentials
|
||||||
.getAllTokens()) {
|
.getAllTokens()) {
|
||||||
TokenIdentifier tokenIdentifier = token.decodeIdentifier();
|
if (token.getKind().equals(TimelineDelegationTokenIdentifier.KIND_NAME)) {
|
||||||
if (tokenIdentifier instanceof TimelineDelegationTokenIdentifier) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier>
|
||||||
|
timelineDelegationToken = getTimelineDelegationToken();
|
||||||
|
if (timelineDelegationToken == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
credentials.addToken(timelineService, timelineDelegationToken);
|
credentials.addToken(timelineService, timelineDelegationToken);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Add timline delegation token into credentials: "
|
LOG.debug("Add timline delegation token into credentials: "
|
||||||
|
@ -306,6 +305,13 @@ public class YarnClientImpl extends YarnClient {
|
||||||
clc.setTokens(tokens);
|
clc.setTokens(tokens);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier>
|
||||||
|
getTimelineDelegationToken() throws IOException, YarnException {
|
||||||
|
return timelineClient.getDelegationToken(
|
||||||
|
UserGroupInformation.getCurrentUser().getUserName());
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected boolean isSecurityEnabled() {
|
protected boolean isSecurityEnabled() {
|
||||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -39,6 +41,7 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -745,10 +748,13 @@ public class TestYarnClient {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
|
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
|
||||||
|
TimelineDelegationTokenIdentifier timelineDT =
|
||||||
|
new TimelineDelegationTokenIdentifier();
|
||||||
final Token<TimelineDelegationTokenIdentifier> dToken =
|
final Token<TimelineDelegationTokenIdentifier> dToken =
|
||||||
new Token<TimelineDelegationTokenIdentifier>();
|
new Token<TimelineDelegationTokenIdentifier>(
|
||||||
|
timelineDT.getBytes(), new byte[0], timelineDT.getKind(), new Text());
|
||||||
// crate a mock client
|
// crate a mock client
|
||||||
YarnClientImpl client = new YarnClientImpl() {
|
YarnClientImpl client = spy(new YarnClientImpl() {
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
if (getConfig().getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
if (getConfig().getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
||||||
|
@ -784,34 +790,48 @@ public class TestYarnClient {
|
||||||
public boolean isSecurityEnabled() {
|
public boolean isSecurityEnabled() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
});
|
||||||
client.init(conf);
|
client.init(conf);
|
||||||
client.start();
|
client.start();
|
||||||
ApplicationSubmissionContext context =
|
try {
|
||||||
mock(ApplicationSubmissionContext.class);
|
// when i == 0, timeline DT already exists, no need to get one more
|
||||||
ApplicationId applicationId = ApplicationId.newInstance(0, 1);
|
// when i == 1, timeline DT doesn't exist, need to get one more
|
||||||
when(context.getApplicationId()).thenReturn(applicationId);
|
for (int i = 0; i < 2; ++i) {
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
ApplicationSubmissionContext context =
|
||||||
Credentials credentials = new Credentials();
|
mock(ApplicationSubmissionContext.class);
|
||||||
credentials.writeTokenStorageToStream(dob);
|
ApplicationId applicationId = ApplicationId.newInstance(0, i + 1);
|
||||||
ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
when(context.getApplicationId()).thenReturn(applicationId);
|
||||||
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
null, null, null, null, tokens, null);
|
Credentials credentials = new Credentials();
|
||||||
when(context.getAMContainerSpec()).thenReturn(clc);
|
if (i == 0) {
|
||||||
client.submitApplication(context);
|
credentials.addToken(client.timelineService, dToken);
|
||||||
// Check whether token is added or not
|
}
|
||||||
credentials = new Credentials();
|
credentials.writeTokenStorageToStream(dob);
|
||||||
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
tokens = clc.getTokens();
|
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
||||||
if (tokens != null) {
|
null, null, null, null, tokens, null);
|
||||||
dibb.reset(tokens);
|
when(context.getAMContainerSpec()).thenReturn(clc);
|
||||||
credentials.readTokenStorageStream(dibb);
|
client.submitApplication(context);
|
||||||
tokens.rewind();
|
if (i == 0) {
|
||||||
|
// GetTimelineDelegationToken shouldn't be called
|
||||||
|
verify(client, never()).getTimelineDelegationToken();
|
||||||
|
}
|
||||||
|
// In either way, token should be there
|
||||||
|
credentials = new Credentials();
|
||||||
|
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
||||||
|
tokens = clc.getTokens();
|
||||||
|
if (tokens != null) {
|
||||||
|
dibb.reset(tokens);
|
||||||
|
credentials.readTokenStorageStream(dibb);
|
||||||
|
tokens.rewind();
|
||||||
|
}
|
||||||
|
Collection<Token<? extends TokenIdentifier>> dTokens =
|
||||||
|
credentials.getAllTokens();
|
||||||
|
Assert.assertEquals(1, dTokens.size());
|
||||||
|
Assert.assertEquals(dToken, dTokens.iterator().next());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
client.stop();
|
||||||
}
|
}
|
||||||
Collection<Token<? extends TokenIdentifier>> dTokens =
|
|
||||||
credentials.getAllTokens();
|
|
||||||
Assert.assertEquals(1, dTokens.size());
|
|
||||||
Assert.assertEquals(dToken, dTokens.iterator().next());
|
|
||||||
client.stop();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue