diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 49556c4edbb..cefefd99ca8 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -596,6 +596,8 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4802. Takes a long time to load the task list on the AM for large jobs (Ravi Prakash via bobby) + MAPREDUCE-4764. repair TestBinaryTokenFile (Ivan A. Veselovsky via bobby) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java index e9e779f24f4..b92400dbf6a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java @@ -35,26 +35,28 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.SleepJob; -import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; -@SuppressWarnings("deprecation") -@Ignore public class TestBinaryTokenFile { + private static final String KEY_SECURITY_TOKEN_FILE_NAME = "key-security-token-file"; + private static final String DELEGATION_TOKEN_KEY = "Hdfs"; + // my sleep class static class MySleepMapper extends SleepJob.SleepMapper { /** @@ -63,29 +65,65 @@ static class MySleepMapper extends SleepJob.SleepMapper { @Override public void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException { - // get token storage and a key - Credentials ts = context.getCredentials(); - Collection> dts = ts.getAllTokens(); + // get context token storage: + final Credentials contextCredentials = context.getCredentials(); - - if(dts.size() != 2) { // one job token and one delegation token - throw new RuntimeException("tokens are not available"); // fail the test + final Collection> contextTokenCollection = contextCredentials.getAllTokens(); + for (Token t : contextTokenCollection) { + System.out.println("Context token: [" + t + "]"); + } + if (contextTokenCollection.size() != 2) { // one job token and one delegation token + // fail the test: + throw new RuntimeException("Exactly 2 tokens are expected in the contextTokenCollection: " + + "one job token and one delegation token, but was found " + contextTokenCollection.size() + " tokens."); } - Token dt = ts.getToken(new Text("Hdfs")); + final Token dt = contextCredentials.getToken(new Text(DELEGATION_TOKEN_KEY)); + if (dt == null) { + throw new RuntimeException("Token for key ["+DELEGATION_TOKEN_KEY+"] not found in the job context."); + } - //Verify that dt is same as the token in the file - String tokenFile = context.getConfiguration().get( - "mapreduce.job.credentials.binary"); - Credentials cred = new Credentials(); - cred.readTokenStorageStream(new DataInputStream(new FileInputStream( + String tokenFile0 = context.getConfiguration().get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY); + if (tokenFile0 != null) { + throw new RuntimeException("Token file key ["+MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY+"] found in the configuration. It should have been removed from the configuration."); + } + + final String tokenFile = context.getConfiguration().get(KEY_SECURITY_TOKEN_FILE_NAME); + if (tokenFile == null) { + throw new RuntimeException("Token file key ["+KEY_SECURITY_TOKEN_FILE_NAME+"] not found in the job configuration."); + } + final Credentials binaryCredentials = new Credentials(); + binaryCredentials.readTokenStorageStream(new DataInputStream(new FileInputStream( tokenFile))); - for (Token t : cred.getAllTokens()) { - if (!dt.equals(t)) { - throw new RuntimeException( - "Delegation token in job is not same as the token passed in file." - + " tokenInFile=" + t + ", dt=" + dt); - } + final Collection> binaryTokenCollection = binaryCredentials.getAllTokens(); + if (binaryTokenCollection.size() != 1) { + throw new RuntimeException("The token collection read from file ["+tokenFile+"] must have size = 1."); + } + final Token binTok = binaryTokenCollection.iterator().next(); + System.out.println("The token read from binary file: t = [" + binTok + "]"); + // Verify that dt is same as the token in the file: + if (!dt.equals(binTok)) { + throw new RuntimeException( + "Delegation token in job is not same as the token passed in file:" + + " tokenInFile=[" + binTok + "], dt=[" + dt + "]."); + } + + // Now test the user tokens. + final UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + // Print all the UGI tokens for diagnostic purposes: + final Collection> ugiTokenCollection = ugi.getTokens(); + for (Token t: ugiTokenCollection) { + System.out.println("UGI token: [" + t + "]"); + } + final Token ugiToken + = ugi.getCredentials().getToken(new Text(DELEGATION_TOKEN_KEY)); + if (ugiToken == null) { + throw new RuntimeException("Token for key ["+DELEGATION_TOKEN_KEY+"] not found among the UGI tokens."); + } + if (!ugiToken.equals(binTok)) { + throw new RuntimeException( + "UGI token is not same as the token passed in binary file:" + + " tokenInBinFile=[" + binTok + "], ugiTok=[" + ugiToken + "]."); } super.map(key, value, context); @@ -118,13 +156,20 @@ private void setupBinaryTokenFile(Job job) { TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 }, job.getConfiguration()); for (Token t : cred1.getAllTokens()) { - cred2.addToken(new Text("Hdfs"), t); + cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t); } DataOutputStream os = new DataOutputStream(new FileOutputStream( binaryTokenFileName.toString())); - cred2.writeTokenStorageToStream(os); - os.close(); - job.getConfiguration().set("mapreduce.job.credentials.binary", + try { + cred2.writeTokenStorageToStream(os); + } finally { + os.close(); + } + job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, + binaryTokenFileName.toString()); + // NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY key now gets deleted from config, + // so it's not accessible in the job's config. So, we use another key to pass the file name into the job configuration: + job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME, binaryTokenFileName.toString()); } catch (IOException e) { Assert.fail("Exception " + e); @@ -132,39 +177,53 @@ private void setupBinaryTokenFile(Job job) { } } - private static MiniMRCluster mrCluster; + private static MiniMRYarnCluster mrCluster; private static MiniDFSCluster dfsCluster; + private static final Path TEST_DIR = new Path(System.getProperty("test.build.data","/tmp")); private static final Path binaryTokenFileName = new Path(TEST_DIR, "tokenFile.binary"); - private static int numSlaves = 1; - private static JobConf jConf; + + private static final int numSlaves = 1; // num of data nodes + private static final int noOfNMs = 1; + private static Path p1; @BeforeClass public static void setUp() throws Exception { - Configuration conf = new Configuration(); - dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null); - jConf = new JobConf(conf); - mrCluster = new MiniMRCluster(0, 0, numSlaves, - dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, - jConf); - - NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads(); - FileSystem fs = dfsCluster.getFileSystem(); + final Configuration conf = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + conf.set(YarnConfiguration.RM_PRINCIPAL, "jt_id/" + SecurityUtil.HOSTNAME_PATTERN + "@APACHE.ORG"); + + final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + builder.checkExitOnShutdown(true); + builder.numDataNodes(numSlaves); + builder.format(true); + builder.racks(null); + dfsCluster = builder.build(); + + mrCluster = new MiniMRYarnCluster(TestBinaryTokenFile.class.getName(), noOfNMs); + mrCluster.init(conf); + mrCluster.start(); + + NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads(); + + FileSystem fs = dfsCluster.getFileSystem(); p1 = new Path("file1"); p1 = fs.makeQualified(p1); } @AfterClass public static void tearDown() throws Exception { - if(mrCluster != null) - mrCluster.shutdown(); - mrCluster = null; - if(dfsCluster != null) + if(mrCluster != null) { + mrCluster.stop(); + mrCluster = null; + } + if(dfsCluster != null) { dfsCluster.shutdown(); - dfsCluster = null; + dfsCluster = null; + } } /** @@ -173,31 +232,24 @@ public static void tearDown() throws Exception { */ @Test public void testBinaryTokenFile() throws IOException { - - System.out.println("running dist job"); - - // make sure JT starts - jConf = mrCluster.createJobConf(); + Configuration conf = mrCluster.getConfig(); // provide namenodes names for the job to get the delegation tokens for - String nnUri = dfsCluster.getURI(0).toString(); - jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); - // job tracker principla id.. - jConf.set(JTConfig.JT_USER_NAME, "jt_id"); + final String nnUri = dfsCluster.getURI(0).toString(); + conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); // using argument to pass the file name - String[] args = { + final String[] args = { "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" }; - int res = -1; try { - res = ToolRunner.run(jConf, new MySleepJob(), args); + res = ToolRunner.run(conf, new MySleepJob(), args); } catch (Exception e) { - System.out.println("Job failed with" + e.getLocalizedMessage()); + System.out.println("Job failed with " + e.getLocalizedMessage()); e.printStackTrace(System.out); fail("Job failed"); } - assertEquals("dist job res is not 0", res, 0); + assertEquals("dist job res is not 0:", 0, res); } }