MAPREDUCE-4764. repair TestBinaryTokenFile (Ivan A. Veselovsky via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1413739 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-11-26 17:37:05 +00:00
parent 6c5f37e46d
commit 481f451ca2
2 changed files with 113 additions and 59 deletions

View File

@ -596,6 +596,8 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4802. Takes a long time to load the task list on the AM for MAPREDUCE-4802. Takes a long time to load the task list on the AM for
large jobs (Ravi Prakash via bobby) large jobs (Ravi Prakash via bobby)
MAPREDUCE-4764. repair TestBinaryTokenFile (Ivan A. Veselovsky via bobby)
Release 0.23.5 - UNRELEASED Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -35,26 +35,28 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings("deprecation")
@Ignore
public class TestBinaryTokenFile { public class TestBinaryTokenFile {
private static final String KEY_SECURITY_TOKEN_FILE_NAME = "key-security-token-file";
private static final String DELEGATION_TOKEN_KEY = "Hdfs";
// my sleep class // my sleep class
static class MySleepMapper extends SleepJob.SleepMapper { static class MySleepMapper extends SleepJob.SleepMapper {
/** /**
@ -63,29 +65,65 @@ public class TestBinaryTokenFile {
@Override @Override
public void map(IntWritable key, IntWritable value, Context context) public void map(IntWritable key, IntWritable value, Context context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
// get token storage and a key // get context token storage:
Credentials ts = context.getCredentials(); final Credentials contextCredentials = context.getCredentials();
Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
final Collection<Token<? extends TokenIdentifier>> contextTokenCollection = contextCredentials.getAllTokens();
if(dts.size() != 2) { // one job token and one delegation token for (Token<? extends TokenIdentifier> t : contextTokenCollection) {
throw new RuntimeException("tokens are not available"); // fail the test System.out.println("Context token: [" + t + "]");
}
if (contextTokenCollection.size() != 2) { // one job token and one delegation token
// fail the test:
throw new RuntimeException("Exactly 2 tokens are expected in the contextTokenCollection: " +
"one job token and one delegation token, but was found " + contextTokenCollection.size() + " tokens.");
} }
Token<? extends TokenIdentifier> dt = ts.getToken(new Text("Hdfs")); final Token<? extends TokenIdentifier> dt = contextCredentials.getToken(new Text(DELEGATION_TOKEN_KEY));
if (dt == null) {
throw new RuntimeException("Token for key ["+DELEGATION_TOKEN_KEY+"] not found in the job context.");
}
//Verify that dt is same as the token in the file String tokenFile0 = context.getConfiguration().get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
String tokenFile = context.getConfiguration().get( if (tokenFile0 != null) {
"mapreduce.job.credentials.binary"); throw new RuntimeException("Token file key ["+MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY+"] found in the configuration. It should have been removed from the configuration.");
Credentials cred = new Credentials(); }
cred.readTokenStorageStream(new DataInputStream(new FileInputStream(
final String tokenFile = context.getConfiguration().get(KEY_SECURITY_TOKEN_FILE_NAME);
if (tokenFile == null) {
throw new RuntimeException("Token file key ["+KEY_SECURITY_TOKEN_FILE_NAME+"] not found in the job configuration.");
}
final Credentials binaryCredentials = new Credentials();
binaryCredentials.readTokenStorageStream(new DataInputStream(new FileInputStream(
tokenFile))); tokenFile)));
for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) { final Collection<Token<? extends TokenIdentifier>> binaryTokenCollection = binaryCredentials.getAllTokens();
if (!dt.equals(t)) { if (binaryTokenCollection.size() != 1) {
throw new RuntimeException( throw new RuntimeException("The token collection read from file ["+tokenFile+"] must have size = 1.");
"Delegation token in job is not same as the token passed in file." }
+ " tokenInFile=" + t + ", dt=" + dt); final Token<? extends TokenIdentifier> binTok = binaryTokenCollection.iterator().next();
} System.out.println("The token read from binary file: t = [" + binTok + "]");
// Verify that dt is same as the token in the file:
if (!dt.equals(binTok)) {
throw new RuntimeException(
"Delegation token in job is not same as the token passed in file:"
+ " tokenInFile=[" + binTok + "], dt=[" + dt + "].");
}
// Now test the user tokens.
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
// Print all the UGI tokens for diagnostic purposes:
final Collection<Token<? extends TokenIdentifier>> ugiTokenCollection = ugi.getTokens();
for (Token<? extends TokenIdentifier> t: ugiTokenCollection) {
System.out.println("UGI token: [" + t + "]");
}
final Token<? extends TokenIdentifier> ugiToken
= ugi.getCredentials().getToken(new Text(DELEGATION_TOKEN_KEY));
if (ugiToken == null) {
throw new RuntimeException("Token for key ["+DELEGATION_TOKEN_KEY+"] not found among the UGI tokens.");
}
if (!ugiToken.equals(binTok)) {
throw new RuntimeException(
"UGI token is not same as the token passed in binary file:"
+ " tokenInBinFile=[" + binTok + "], ugiTok=[" + ugiToken + "].");
} }
super.map(key, value, context); super.map(key, value, context);
@ -118,13 +156,20 @@ public class TestBinaryTokenFile {
TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 }, TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
job.getConfiguration()); job.getConfiguration());
for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) { for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
cred2.addToken(new Text("Hdfs"), t); cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t);
} }
DataOutputStream os = new DataOutputStream(new FileOutputStream( DataOutputStream os = new DataOutputStream(new FileOutputStream(
binaryTokenFileName.toString())); binaryTokenFileName.toString()));
cred2.writeTokenStorageToStream(os); try {
os.close(); cred2.writeTokenStorageToStream(os);
job.getConfiguration().set("mapreduce.job.credentials.binary", } finally {
os.close();
}
job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
binaryTokenFileName.toString());
// NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY key now gets deleted from config,
// so it's not accessible in the job's config. So, we use another key to pass the file name into the job configuration:
job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME,
binaryTokenFileName.toString()); binaryTokenFileName.toString());
} catch (IOException e) { } catch (IOException e) {
Assert.fail("Exception " + e); Assert.fail("Exception " + e);
@ -132,39 +177,53 @@ public class TestBinaryTokenFile {
} }
} }
private static MiniMRCluster mrCluster; private static MiniMRYarnCluster mrCluster;
private static MiniDFSCluster dfsCluster; private static MiniDFSCluster dfsCluster;
private static final Path TEST_DIR = private static final Path TEST_DIR =
new Path(System.getProperty("test.build.data","/tmp")); new Path(System.getProperty("test.build.data","/tmp"));
private static final Path binaryTokenFileName = new Path(TEST_DIR, "tokenFile.binary"); private static final Path binaryTokenFileName = new Path(TEST_DIR, "tokenFile.binary");
private static int numSlaves = 1;
private static JobConf jConf; private static final int numSlaves = 1; // num of data nodes
private static final int noOfNMs = 1;
private static Path p1; private static Path p1;
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
Configuration conf = new Configuration(); final Configuration conf = new Configuration();
dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
jConf = new JobConf(conf);
mrCluster = new MiniMRCluster(0, 0, numSlaves,
dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null,
jConf);
NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads();
FileSystem fs = dfsCluster.getFileSystem();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_PRINCIPAL, "jt_id/" + SecurityUtil.HOSTNAME_PATTERN + "@APACHE.ORG");
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.checkExitOnShutdown(true);
builder.numDataNodes(numSlaves);
builder.format(true);
builder.racks(null);
dfsCluster = builder.build();
mrCluster = new MiniMRYarnCluster(TestBinaryTokenFile.class.getName(), noOfNMs);
mrCluster.init(conf);
mrCluster.start();
NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads();
FileSystem fs = dfsCluster.getFileSystem();
p1 = new Path("file1"); p1 = new Path("file1");
p1 = fs.makeQualified(p1); p1 = fs.makeQualified(p1);
} }
@AfterClass @AfterClass
public static void tearDown() throws Exception { public static void tearDown() throws Exception {
if(mrCluster != null) if(mrCluster != null) {
mrCluster.shutdown(); mrCluster.stop();
mrCluster = null; mrCluster = null;
if(dfsCluster != null) }
if(dfsCluster != null) {
dfsCluster.shutdown(); dfsCluster.shutdown();
dfsCluster = null; dfsCluster = null;
}
} }
/** /**
@ -173,31 +232,24 @@ public class TestBinaryTokenFile {
*/ */
@Test @Test
public void testBinaryTokenFile() throws IOException { public void testBinaryTokenFile() throws IOException {
Configuration conf = mrCluster.getConfig();
System.out.println("running dist job");
// make sure JT starts
jConf = mrCluster.createJobConf();
// provide namenodes names for the job to get the delegation tokens for // provide namenodes names for the job to get the delegation tokens for
String nnUri = dfsCluster.getURI(0).toString(); final String nnUri = dfsCluster.getURI(0).toString();
jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
// job tracker principla id..
jConf.set(JTConfig.JT_USER_NAME, "jt_id");
// using argument to pass the file name // using argument to pass the file name
String[] args = { final String[] args = {
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1" "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
}; };
int res = -1; int res = -1;
try { try {
res = ToolRunner.run(jConf, new MySleepJob(), args); res = ToolRunner.run(conf, new MySleepJob(), args);
} catch (Exception e) { } catch (Exception e) {
System.out.println("Job failed with" + e.getLocalizedMessage()); System.out.println("Job failed with " + e.getLocalizedMessage());
e.printStackTrace(System.out); e.printStackTrace(System.out);
fail("Job failed"); fail("Job failed");
} }
assertEquals("dist job res is not 0", res, 0); assertEquals("dist job res is not 0:", 0, res);
} }
} }