YARN-3021. YARN's delegation-token handling disallows certain trust setups to operate properly over DistCp. Contributed by Yongjun Zhang

This commit is contained in:
Jian He 2015-04-16 19:33:15 -07:00
parent 9595cc003c
commit bb6dde68f1
5 changed files with 77 additions and 16 deletions

View File

@ -367,6 +367,8 @@ public interface MRJobConfig {
public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
public static final String JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE = "mapreduce.job.hdfs-servers.token-renewal.exclude";
public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
public static final String JOB_CANCEL_DELEGATION_TOKEN = "mapreduce.job.complete.cancel.delegation.tokens";

View File

@ -101,6 +101,20 @@ public class TokenCache {
}
}
static boolean isTokenRenewalExcluded(FileSystem fs, Configuration conf) {
String [] nns =
conf.getStrings(MRJobConfig.JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE);
if (nns != null) {
String host = fs.getUri().getHost();
for(int i=0; i< nns.length; i++) {
if (nns[i].equals(host)) {
return true;
}
}
}
return false;
}
/**
* get delegation token for a specific FS
* @param fs
@ -110,11 +124,16 @@ public class TokenCache {
*/
static void obtainTokensForNamenodesInternal(FileSystem fs,
Credentials credentials, Configuration conf) throws IOException {
String delegTokenRenewer = Master.getMasterPrincipal(conf);
// RM skips renewing token with empty renewer
String delegTokenRenewer = "";
if (!isTokenRenewalExcluded(fs, conf)) {
delegTokenRenewer = Master.getMasterPrincipal(conf);
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
throw new IOException(
"Can't get Master Kerberos principal for use as renewer");
}
}
mergeBinaryTokens(credentials, conf);
final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,

View File

@ -212,6 +212,9 @@ Release 2.8.0 - UNRELEASED
YARN-3436. Fix URIs in documantion of YARN web service REST APIs.
(Bibin A Chundatt via ozawa)
YARN-3021. YARN's delegation-token handling disallows certain trust setups
to operate properly over DistCp. (Yongjun Zhang via jianhe)
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -79,7 +79,9 @@ public class DelegationTokenRenewer extends AbstractService {
private static final Log LOG =
LogFactory.getLog(DelegationTokenRenewer.class);
@VisibleForTesting
public static final Text HDFS_DELEGATION_KIND =
new Text("HDFS_DELEGATION_TOKEN");
public static final String SCHEME = "hdfs";
// global single timer (daemon)
@ -244,7 +246,7 @@ public class DelegationTokenRenewer extends AbstractService {
String user) {
this.token = token;
this.user = user;
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
try {
AbstractDelegationTokenIdentifier identifier =
(AbstractDelegationTokenIdentifier) token.decodeIdentifier();
@ -424,10 +426,13 @@ public class DelegationTokenRenewer extends AbstractService {
boolean hasHdfsToken = false;
for (Token<?> token : tokens) {
if (token.isManaged()) {
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
LOG.info(applicationId + " found existing hdfs token " + token);
hasHdfsToken = true;
}
if (skipTokenRenewal(token)) {
continue;
}
DelegationTokenToRenew dttr = allTokens.get(token);
if (dttr == null) {
@ -509,13 +514,25 @@ public class DelegationTokenRenewer extends AbstractService {
}
}
/*
* Skip renewing token if the renewer of the token is set to ""
* Caller is expected to have examined that token.isManaged() returns
* true before calling this method.
*/
private boolean skipTokenRenewal(Token<?> token)
throws IOException {
@SuppressWarnings("unchecked")
Text renewer = ((Token<AbstractDelegationTokenIdentifier>)token).
decodeIdentifier().getRenewer();
return (renewer != null && renewer.toString().equals(""));
}
/**
* set task to renew the token
*/
@VisibleForTesting
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
throws IOException {
// calculate timer time
long expiresIn = token.expirationDate - System.currentTimeMillis();
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
@ -558,7 +575,7 @@ public class DelegationTokenRenewer extends AbstractService {
if (hasProxyUserPrivileges
&& dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
&& dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
&& dttr.token.getKind().equals(HDFS_DELEGATION_KIND)) {
final Collection<ApplicationId> applicationIds;
synchronized (dttr.referringAppIds) {
@ -575,7 +592,7 @@ public class DelegationTokenRenewer extends AbstractService {
synchronized (tokenSet) {
while (iter.hasNext()) {
DelegationTokenToRenew t = iter.next();
if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
if (t.token.getKind().equals(HDFS_DELEGATION_KIND)) {
iter.remove();
allTokens.remove(t.token);
t.cancelTimer();

View File

@ -110,7 +110,8 @@ import com.google.common.base.Supplier;
public class TestDelegationTokenRenewer {
private static final Log LOG =
LogFactory.getLog(TestDelegationTokenRenewer.class);
private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN");
private static final Text KIND =
DelegationTokenRenewer.HDFS_DELEGATION_KIND;
private static BlockingQueue<Event> eventQueue;
private static volatile AtomicInteger counter;
@ -481,6 +482,25 @@ public class TestDelegationTokenRenewer {
fail("App submission with a cancelled token should have failed");
}
// Testcase for YARN-3021, let RM skip renewing token if the renewer string
// is empty
@Test(timeout=60000)
public void testAppTokenWithNonRenewer() throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
// Test would fail if using non-empty renewer string here
MyToken token = dfs.getDelegationToken("");
token.cancelToken();
Credentials ts = new Credentials();
ts.addToken(token.getKind(), token);
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
delegationTokenRenewer.addApplicationSync(appId, ts, true, "user");
}
/**
* Basic idea of the test:
* 1. register a token for 2 seconds with no cancel at the end
@ -721,7 +741,7 @@ public class TestDelegationTokenRenewer {
throws IOException, InterruptedException, BrokenBarrierException {
final Credentials credsx = new Credentials();
final Token<DelegationTokenIdentifier> tokenx = mock(Token.class);
when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
when(tokenx.getKind()).thenReturn(KIND);
DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
new Text("user1"));
@ -765,7 +785,7 @@ public class TestDelegationTokenRenewer {
// this token uses barriers to block during renew
final Credentials creds1 = new Credentials();
final Token<DelegationTokenIdentifier> token1 = mock(Token.class);
when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
when(token1.getKind()).thenReturn(KIND);
DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
new Text("user1"));
@ -783,7 +803,7 @@ public class TestDelegationTokenRenewer {
// this dummy token fakes renewing
final Credentials creds2 = new Credentials();
final Token<DelegationTokenIdentifier> token2 = mock(Token.class);
when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
when(token2.getKind()).thenReturn(KIND);
when(token2.decodeIdentifier()).thenReturn(dtId1);
creds2.addToken(new Text("token"), token2);
doReturn(true).when(token2).isManaged();