MAPREDUCE-3849. Change TokenCache's reading of the binary token file (Daryn Sharp via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1245099 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-02-16 17:49:20 +00:00
parent 20816218f0
commit cfc4ad76a3
3 changed files with 109 additions and 22 deletions

View File

@ -94,6 +94,9 @@ Release 0.23.2 - UNRELEASED
IMPROVEMENTS
MAPREDUCE-3849. Change TokenCache's reading of the binary token file
(Daryn Sharp via bobby)
MAPREDUCE-3854. Fixed and reenabled tests related to MR child JVM's
environmental variables in TestMiniMRChildTask. (Tom White via vinodkv)

View File

@ -114,31 +114,10 @@ static void obtainTokensForNamenodesInternal(FileSystem fs,
throw new IOException(
"Can't get Master Kerberos principal for use as renewer");
}
boolean readFile = true;
mergeBinaryTokens(credentials, conf);
String fsName = fs.getCanonicalServiceName();
if (TokenCache.getDelegationToken(credentials, fsName) == null) {
//TODO: Need to come up with a better place to put
//this block of code to do with reading the file
if (readFile) {
readFile = false;
String binaryTokenFilename =
conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
if (binaryTokenFilename != null) {
Credentials binary;
try {
binary = Credentials.readTokenStorageFile(
new Path("file:///" + binaryTokenFilename), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
credentials.addAll(binary);
}
if (TokenCache.getDelegationToken(credentials, fsName) != null) {
LOG.debug("DT for " + fsName + " is already present");
return;
}
}
List<Token<?>> tokens =
fs.getDelegationTokens(delegTokenRenewer, credentials);
if (tokens != null) {
@ -161,6 +140,22 @@ static void obtainTokensForNamenodesInternal(FileSystem fs,
}
}
private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
String binaryTokenFilename =
conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
if (binaryTokenFilename != null) {
Credentials binary;
try {
binary = Credentials.readTokenStorageFile(
new Path("file:///" + binaryTokenFilename), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
// supplement existing tokens with the tokens in the binary file
creds.mergeAll(binary);
}
}
/**
* file name used on HDFS for generated job token
*/

View File

@ -21,23 +21,27 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Master;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@ -162,6 +166,91 @@ public List<Token<?>> answer(InvocationOnMock invocation)
return mockFs;
}
@Test
@SuppressWarnings("deprecation")
public void testBinaryCredentials() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
String renewer = Master.getMasterPrincipal(conf);
Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","test/build/data"));
// ick, but need fq path minus file:/
String binaryTokenFile = FileSystem.getLocal(conf).makeQualified(
new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath();
FileSystem fs1 = createFileSystemForService("service1");
FileSystem fs2 = createFileSystemForService("service2");
FileSystem fs3 = createFileSystemForService("service3");
// get the tokens for fs1 & fs2 and write out to binary creds file
Credentials creds = new Credentials();
Token<?> token1 = fs1.getDelegationToken(renewer);
Token<?> token2 = fs2.getDelegationToken(renewer);
creds.addToken(token1.getService(), token1);
creds.addToken(token2.getService(), token2);
// wait to set, else the obtain tokens call above will fail with FNF
conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, binaryTokenFile);
creds.writeTokenStorageFile(new Path(binaryTokenFile), conf);
// re-init creds and add a newer token for fs1
creds = new Credentials();
Token<?> newerToken1 = fs1.getDelegationToken(renewer);
assertFalse(newerToken1.equals(token1));
creds.addToken(newerToken1.getService(), newerToken1);
checkToken(creds, newerToken1);
// get token for fs1, see that fs2's token was loaded
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf);
checkToken(creds, newerToken1, token2);
// get token for fs2, nothing should change since already present
TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf);
checkToken(creds, newerToken1, token2);
// get token for fs3, should only add token for fs3
TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf);
Token<?> token3 = creds.getToken(new Text(fs3.getCanonicalServiceName()));
assertTrue(token3 != null);
checkToken(creds, newerToken1, token2, token3);
// be paranoid, check one last time that nothing changes
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf);
TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf);
TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf);
checkToken(creds, newerToken1, token2, token3);
}
private void checkToken(Credentials creds, Token<?> ... tokens) {
assertEquals(tokens.length, creds.getAllTokens().size());
for (Token<?> token : tokens) {
Token<?> credsToken = creds.getToken(token.getService());
assertTrue(credsToken != null);
assertEquals(token, credsToken);
}
}
@SuppressWarnings("deprecation")
private FileSystem createFileSystemForService(final String service)
throws IOException {
FileSystem mockFs = mock(FileSystem.class);
when(mockFs.getCanonicalServiceName()).thenReturn(service);
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
new Answer<Token<?>>() {
int unique = 0;
@Override
public Token<?> answer(InvocationOnMock invocation) throws Throwable {
Token<?> token = new Token<TokenIdentifier>();
token.setService(new Text(service));
// use unique value so when we restore from token storage, we can
// tell if it's really the same token
token.setKind(new Text("token" + unique++));
return token;
}
});
return mockFs;
}
@Test
public void testCleanUpTokenReferral() throws Exception {
Configuration conf = new Configuration();