MAPREDUCE-7073. Optimize TokenCache#obtainTokensForNamenodesInternal

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit 1a95a4524a)
This commit is contained in:
Bibin A Chundatt 2018-05-02 16:14:28 +09:00 committed by Akira Ajisaka
parent 5fef28d0d4
commit 254933cf31
No known key found for this signature in database
GPG Key ID: C1EDBB9CA400FD50
2 changed files with 18 additions and 14 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -96,8 +97,9 @@ public class TokenCache {
for(Path p: ps) { for(Path p: ps) {
fsSet.add(p.getFileSystem(conf)); fsSet.add(p.getFileSystem(conf));
} }
String masterPrincipal = Master.getMasterPrincipal(conf);
for (FileSystem fs : fsSet) { for (FileSystem fs : fsSet) {
obtainTokensForNamenodesInternal(fs, credentials, conf); obtainTokensForNamenodesInternal(fs, credentials, conf, masterPrincipal);
} }
} }
@ -123,14 +125,16 @@ public class TokenCache {
* @throws IOException * @throws IOException
*/ */
static void obtainTokensForNamenodesInternal(FileSystem fs, static void obtainTokensForNamenodesInternal(FileSystem fs,
Credentials credentials, Configuration conf) throws IOException { Credentials credentials, Configuration conf, String renewer)
throws IOException {
// RM skips renewing token with empty renewer // RM skips renewing token with empty renewer
String delegTokenRenewer = ""; String delegTokenRenewer = "";
if (!isTokenRenewalExcluded(fs, conf)) { if (!isTokenRenewalExcluded(fs, conf)) {
delegTokenRenewer = Master.getMasterPrincipal(conf); if (StringUtils.isEmpty(renewer)) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
throw new IOException( throw new IOException(
"Can't get Master Kerberos principal for use as renewer"); "Can't get Master Kerberos principal for use as renewer");
} else {
delegTokenRenewer = renewer;
} }
} }

View File

@ -57,7 +57,7 @@ public class TestTokenCache {
public void testObtainTokens() throws Exception { public void testObtainTokens() throws Exception {
Credentials credentials = new Credentials(); Credentials credentials = new Credentials();
FileSystem fs = mock(FileSystem.class); FileSystem fs = mock(FileSystem.class);
TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf); TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf, renewer);
verify(fs).addDelegationTokens(eq(renewer), eq(credentials)); verify(fs).addDelegationTokens(eq(renewer), eq(credentials));
} }
@ -105,23 +105,23 @@ public class TestTokenCache {
checkToken(creds, newerToken1); checkToken(creds, newerToken1);
// get token for fs1, see that fs2's token was loaded // get token for fs1, see that fs2's token was loaded
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf); TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf, renewer);
checkToken(creds, newerToken1, token2); checkToken(creds, newerToken1, token2);
// get token for fs2, nothing should change since already present // get token for fs2, nothing should change since already present
TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf); TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf, renewer);
checkToken(creds, newerToken1, token2); checkToken(creds, newerToken1, token2);
// get token for fs3, should only add token for fs3 // get token for fs3, should only add token for fs3
TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf); TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf, renewer);
Token<?> token3 = creds.getToken(new Text(fs3.getCanonicalServiceName())); Token<?> token3 = creds.getToken(new Text(fs3.getCanonicalServiceName()));
assertTrue(token3 != null); assertTrue(token3 != null);
checkToken(creds, newerToken1, token2, token3); checkToken(creds, newerToken1, token2, token3);
// be paranoid, check one last time that nothing changes // be paranoid, check one last time that nothing changes
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf); TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf, renewer);
TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf); TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf, renewer);
TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf); TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf, renewer);
checkToken(creds, newerToken1, token2, token3); checkToken(creds, newerToken1, token2, token3);
} }
@ -202,7 +202,7 @@ public class TestTokenCache {
// wait to set, else the obtain tokens call above will fail with FNF // wait to set, else the obtain tokens call above will fail with FNF
conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, binaryTokenFile); conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, binaryTokenFile);
creds.writeTokenStorageFile(new Path(binaryTokenFile), conf); creds.writeTokenStorageFile(new Path(binaryTokenFile), conf);
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf); TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf, renewer);
String fs_addr = fs1.getCanonicalServiceName(); String fs_addr = fs1.getCanonicalServiceName();
Token<?> nnt = TokenCache.getDelegationToken(creds, fs_addr); Token<?> nnt = TokenCache.getDelegationToken(creds, fs_addr);
assertNotNull("Token for nn is null", nnt); assertNotNull("Token for nn is null", nnt);