YARN-3021. YARN's delegation-token handling disallows certain trust setups to operate properly over DistCp. Contributed by Yongjun Zhang

(cherry picked from commit bb6dde68f1)
This commit is contained in:
Jian He 2015-04-16 19:33:15 -07:00
parent fa7b18e387
commit 42cf8e0554
5 changed files with 77 additions and 16 deletions

View File

@ -362,6 +362,8 @@ public interface MRJobConfig {
public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers"; public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
public static final String JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE = "mapreduce.job.hdfs-servers.token-renewal.exclude";
public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal"; public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
public static final String JOB_CANCEL_DELEGATION_TOKEN = "mapreduce.job.complete.cancel.delegation.tokens"; public static final String JOB_CANCEL_DELEGATION_TOKEN = "mapreduce.job.complete.cancel.delegation.tokens";

View File

@ -101,6 +101,20 @@ public class TokenCache {
} }
} }
static boolean isTokenRenewalExcluded(FileSystem fs, Configuration conf) {
String [] nns =
conf.getStrings(MRJobConfig.JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE);
if (nns != null) {
String host = fs.getUri().getHost();
for(int i=0; i< nns.length; i++) {
if (nns[i].equals(host)) {
return true;
}
}
}
return false;
}
/** /**
* get delegation token for a specific FS * get delegation token for a specific FS
* @param fs * @param fs
@ -110,11 +124,16 @@ public class TokenCache {
*/ */
static void obtainTokensForNamenodesInternal(FileSystem fs, static void obtainTokensForNamenodesInternal(FileSystem fs,
Credentials credentials, Configuration conf) throws IOException { Credentials credentials, Configuration conf) throws IOException {
String delegTokenRenewer = Master.getMasterPrincipal(conf); // RM skips renewing token with empty renewer
String delegTokenRenewer = "";
if (!isTokenRenewalExcluded(fs, conf)) {
delegTokenRenewer = Master.getMasterPrincipal(conf);
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
throw new IOException( throw new IOException(
"Can't get Master Kerberos principal for use as renewer"); "Can't get Master Kerberos principal for use as renewer");
} }
}
mergeBinaryTokens(credentials, conf); mergeBinaryTokens(credentials, conf);
final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer, final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,

View File

@ -164,6 +164,9 @@ Release 2.8.0 - UNRELEASED
YARN-3436. Fix URIs in documantion of YARN web service REST APIs. YARN-3436. Fix URIs in documantion of YARN web service REST APIs.
(Bibin A Chundatt via ozawa) (Bibin A Chundatt via ozawa)
YARN-3021. YARN's delegation-token handling disallows certain trust setups
to operate properly over DistCp. (Yongjun Zhang via jianhe)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -79,7 +79,9 @@ public class DelegationTokenRenewer extends AbstractService {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(DelegationTokenRenewer.class); LogFactory.getLog(DelegationTokenRenewer.class);
@VisibleForTesting
public static final Text HDFS_DELEGATION_KIND =
new Text("HDFS_DELEGATION_TOKEN");
public static final String SCHEME = "hdfs"; public static final String SCHEME = "hdfs";
// global single timer (daemon) // global single timer (daemon)
@ -244,7 +246,7 @@ public class DelegationTokenRenewer extends AbstractService {
String user) { String user) {
this.token = token; this.token = token;
this.user = user; this.user = user;
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
try { try {
AbstractDelegationTokenIdentifier identifier = AbstractDelegationTokenIdentifier identifier =
(AbstractDelegationTokenIdentifier) token.decodeIdentifier(); (AbstractDelegationTokenIdentifier) token.decodeIdentifier();
@ -424,10 +426,13 @@ public class DelegationTokenRenewer extends AbstractService {
boolean hasHdfsToken = false; boolean hasHdfsToken = false;
for (Token<?> token : tokens) { for (Token<?> token : tokens) {
if (token.isManaged()) { if (token.isManaged()) {
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
LOG.info(applicationId + " found existing hdfs token " + token); LOG.info(applicationId + " found existing hdfs token " + token);
hasHdfsToken = true; hasHdfsToken = true;
} }
if (skipTokenRenewal(token)) {
continue;
}
DelegationTokenToRenew dttr = allTokens.get(token); DelegationTokenToRenew dttr = allTokens.get(token);
if (dttr == null) { if (dttr == null) {
@ -509,13 +514,25 @@ public class DelegationTokenRenewer extends AbstractService {
} }
} }
/*
* Skip renewing token if the renewer of the token is set to ""
* Caller is expected to have examined that token.isManaged() returns
* true before calling this method.
*/
private boolean skipTokenRenewal(Token<?> token)
throws IOException {
@SuppressWarnings("unchecked")
Text renewer = ((Token<AbstractDelegationTokenIdentifier>)token).
decodeIdentifier().getRenewer();
return (renewer != null && renewer.toString().equals(""));
}
/** /**
* set task to renew the token * set task to renew the token
*/ */
@VisibleForTesting @VisibleForTesting
protected void setTimerForTokenRenewal(DelegationTokenToRenew token) protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
throws IOException { throws IOException {
// calculate timer time // calculate timer time
long expiresIn = token.expirationDate - System.currentTimeMillis(); long expiresIn = token.expirationDate - System.currentTimeMillis();
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
@ -558,7 +575,7 @@ public class DelegationTokenRenewer extends AbstractService {
if (hasProxyUserPrivileges if (hasProxyUserPrivileges
&& dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
&& dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { && dttr.token.getKind().equals(HDFS_DELEGATION_KIND)) {
final Collection<ApplicationId> applicationIds; final Collection<ApplicationId> applicationIds;
synchronized (dttr.referringAppIds) { synchronized (dttr.referringAppIds) {
@ -575,7 +592,7 @@ public class DelegationTokenRenewer extends AbstractService {
synchronized (tokenSet) { synchronized (tokenSet) {
while (iter.hasNext()) { while (iter.hasNext()) {
DelegationTokenToRenew t = iter.next(); DelegationTokenToRenew t = iter.next();
if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { if (t.token.getKind().equals(HDFS_DELEGATION_KIND)) {
iter.remove(); iter.remove();
allTokens.remove(t.token); allTokens.remove(t.token);
t.cancelTimer(); t.cancelTimer();

View File

@ -110,7 +110,8 @@ import com.google.common.base.Supplier;
public class TestDelegationTokenRenewer { public class TestDelegationTokenRenewer {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(TestDelegationTokenRenewer.class); LogFactory.getLog(TestDelegationTokenRenewer.class);
private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN"); private static final Text KIND =
DelegationTokenRenewer.HDFS_DELEGATION_KIND;
private static BlockingQueue<Event> eventQueue; private static BlockingQueue<Event> eventQueue;
private static volatile AtomicInteger counter; private static volatile AtomicInteger counter;
@ -481,6 +482,25 @@ public class TestDelegationTokenRenewer {
fail("App submission with a cancelled token should have failed"); fail("App submission with a cancelled token should have failed");
} }
// Testcase for YARN-3021, let RM skip renewing token if the renewer string
// is empty
@Test(timeout=60000)
public void testAppTokenWithNonRenewer() throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
// Test would fail if using non-empty renewer string here
MyToken token = dfs.getDelegationToken("");
token.cancelToken();
Credentials ts = new Credentials();
ts.addToken(token.getKind(), token);
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
delegationTokenRenewer.addApplicationSync(appId, ts, true, "user");
}
/** /**
* Basic idea of the test: * Basic idea of the test:
* 1. register a token for 2 seconds with no cancel at the end * 1. register a token for 2 seconds with no cancel at the end
@ -721,7 +741,7 @@ public class TestDelegationTokenRenewer {
throws IOException, InterruptedException, BrokenBarrierException { throws IOException, InterruptedException, BrokenBarrierException {
final Credentials credsx = new Credentials(); final Credentials credsx = new Credentials();
final Token<DelegationTokenIdentifier> tokenx = mock(Token.class); final Token<DelegationTokenIdentifier> tokenx = mock(Token.class);
when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN")); when(tokenx.getKind()).thenReturn(KIND);
DelegationTokenIdentifier dtId1 = DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"), new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
new Text("user1")); new Text("user1"));
@ -765,7 +785,7 @@ public class TestDelegationTokenRenewer {
// this token uses barriers to block during renew // this token uses barriers to block during renew
final Credentials creds1 = new Credentials(); final Credentials creds1 = new Credentials();
final Token<DelegationTokenIdentifier> token1 = mock(Token.class); final Token<DelegationTokenIdentifier> token1 = mock(Token.class);
when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN")); when(token1.getKind()).thenReturn(KIND);
DelegationTokenIdentifier dtId1 = DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"), new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
new Text("user1")); new Text("user1"));
@ -783,7 +803,7 @@ public class TestDelegationTokenRenewer {
// this dummy token fakes renewing // this dummy token fakes renewing
final Credentials creds2 = new Credentials(); final Credentials creds2 = new Credentials();
final Token<DelegationTokenIdentifier> token2 = mock(Token.class); final Token<DelegationTokenIdentifier> token2 = mock(Token.class);
when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN")); when(token2.getKind()).thenReturn(KIND);
when(token2.decodeIdentifier()).thenReturn(dtId1); when(token2.decodeIdentifier()).thenReturn(dtId1);
creds2.addToken(new Text("token"), token2); creds2.addToken(new Text("token"), token2);
doReturn(true).when(token2).isManaged(); doReturn(true).when(token2).isManaged();