diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5ba3f367379..89afe5c4d0e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -841,6 +841,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4570. ProcfsBasedProcessTree#constructProcessInfo() prints a warning if procfsDir//stat is not found. (Ahmed Radwan via bobby) + MAPREDUCE-4600. TestTokenCache.java from MRV1 no longer compiles (daryn + via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java deleted file mode 100644 index 293645a1d85..00000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java +++ /dev/null @@ -1,459 +0,0 @@ -/** Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapreduce.security; - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.security.NoSuchAlgorithmException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.crypto.KeyGenerator; -import javax.crypto.spec.SecretKeySpec; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.viewfs.ViewFileSystem; -import org.apache.hadoop.hdfs.HftpFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Master; -import org.apache.hadoop.mapred.MiniMRCluster; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.SleepJob; -import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.tools.HadoopArchives; -import org.apache.hadoop.util.ToolRunner; -import org.codehaus.jackson.map.ObjectMapper; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -public class TestTokenCache { - private static final int NUM_OF_KEYS = 10; - - // my sleep class - adds check for tokenCache - static class MySleepMapper extends SleepJob.SleepMapper { - /** - * attempts to access tokenCache as from client - */ - @Override - public void map(IntWritable key, IntWritable value, Context context) - throws IOException, InterruptedException { - // get token storage and a key - Credentials ts = context.getCredentials(); - byte[] key1 = ts.getSecretKey(new Text("alias1")); - Collection> dts = ts.getAllTokens(); - int dts_size = 0; - if(dts != null) - dts_size = dts.size(); - - - if(dts_size != 2) { // one job token and one delegation token - throw new RuntimeException("tokens are not available"); // fail the test - } - - - if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) { - throw new RuntimeException("secret keys are not available"); // fail the test - } - super.map(key, value, context); - } - } - - class MySleepJob extends SleepJob { - @Override - public Job createJob(int numMapper, int numReducer, - long mapSleepTime, int mapSleepCount, - long reduceSleepTime, int reduceSleepCount) - throws IOException { - Job job = super.createJob(numMapper, numReducer, - mapSleepTime, mapSleepCount, - reduceSleepTime, reduceSleepCount); - - job.setMapperClass(MySleepMapper.class); - //Populate tokens here because security is disabled. - populateTokens(job); - return job; - } - - private void populateTokens(Job job) { - // Credentials in the job will not have delegation tokens - // because security is disabled. Fetch delegation tokens - // and populate the credential in the job. - try { - Credentials ts = job.getCredentials(); - Path p1 = new Path("file1"); - p1 = p1.getFileSystem(job.getConfiguration()).makeQualified(p1); - Credentials cred = new Credentials(); - TokenCache.obtainTokensForNamenodesInternal(cred, new Path[] { p1 }, - job.getConfiguration()); - for (Token t : cred.getAllTokens()) { - ts.addToken(new Text("Hdfs"), t); - } - } catch (IOException e) { - Assert.fail("Exception " + e); - } - } - } - - private static MiniMRCluster mrCluster; - private static MiniDFSCluster dfsCluster; - private static final Path TEST_DIR = - new Path(System.getProperty("test.build.data","/tmp"), "sleepTest"); - private static final Path tokenFileName = new Path(TEST_DIR, "tokenFile.json"); - private static int numSlaves = 1; - private static JobConf jConf; - private static ObjectMapper mapper = new ObjectMapper(); - private static Path p1; - private static Path p2; - - @BeforeClass - public static void setUp() throws Exception { - - Configuration conf = new Configuration(); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL, "RULE:[2:$1]"); - dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null); - jConf = new JobConf(conf); - mrCluster = new MiniMRCluster(0, 0, numSlaves, - dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, - jConf); - - createTokenFileJson(); - verifySecretKeysInJSONFile(); - NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads(); - FileSystem fs = dfsCluster.getFileSystem(); - - p1 = new Path("file1"); - p2 = new Path("file2"); - - p1 = fs.makeQualified(p1); - } - - @AfterClass - public static void tearDown() throws Exception { - if(mrCluster != null) - mrCluster.shutdown(); - mrCluster = null; - if(dfsCluster != null) - dfsCluster.shutdown(); - dfsCluster = null; - } - - // create jason file and put some keys into it.. - private static void createTokenFileJson() throws IOException { - Map map = new HashMap(); - - try { - KeyGenerator kg = KeyGenerator.getInstance("HmacSHA1"); - for(int i=0; i map; - map = mapper.readValue(new File(tokenFileName.toString()), Map.class); - assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS); - } - - /** - * run a distributed job and verify that TokenCache is available - * @throws IOException - */ - @Test - public void testTokenCache() throws IOException { - - System.out.println("running dist job"); - - // make sure JT starts - jConf = mrCluster.createJobConf(); - - // provide namenodes names for the job to get the delegation tokens for - String nnUri = dfsCluster.getURI(0).toString(); - jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); - // job tracker principla id.. - jConf.set(JTConfig.JT_USER_NAME, "jt_id/foo@BAR"); - - // using argument to pass the file name - String[] args = { - "-tokenCacheFile", tokenFileName.toString(), - "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" - }; - - int res = -1; - try { - res = ToolRunner.run(jConf, new MySleepJob(), args); - } catch (Exception e) { - System.out.println("Job failed with" + e.getLocalizedMessage()); - e.printStackTrace(System.out); - fail("Job failed"); - } - assertEquals("dist job res is not 0", res, 0); - } - - /** - * run a local job and verify that TokenCache is available - * @throws NoSuchAlgorithmException - * @throws IOException - */ - @Test - public void testLocalJobTokenCache() throws NoSuchAlgorithmException, IOException { - - System.out.println("running local job"); - // this is local job - String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"}; - jConf.set("mapreduce.job.credentials.json", tokenFileName.toString()); - - int res = -1; - try { - res = ToolRunner.run(jConf, new MySleepJob(), args); - } catch (Exception e) { - System.out.println("Job failed with" + e.getLocalizedMessage()); - e.printStackTrace(System.out); - fail("local Job failed"); - } - assertEquals("local job res is not 0", res, 0); - } - - @Test - public void testGetTokensForNamenodes() throws IOException { - - Credentials credentials = new Credentials(); - TokenCache.obtainTokensForNamenodesInternal(credentials, new Path[] { p1, - p2 }, jConf); - - // this token is keyed by hostname:port key. - String fs_addr = - SecurityUtil.buildDTServiceName(p1.toUri(), NameNode.DEFAULT_PORT); - Token nnt = (Token)TokenCache.getDelegationToken( - credentials, fs_addr); - System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " + nnt); - assertNotNull("Token for nn is null", nnt); - - // verify the size - Collection> tns = credentials.getAllTokens(); - assertEquals("number of tokens is not 1", 1, tns.size()); - - boolean found = false; - for(Token t: tns) { - if(t.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) && - t.getService().equals(new Text(fs_addr))) { - found = true; - } - assertTrue("didn't find token for " + p1 ,found); - } - } - - @Test - public void testGetTokensForHftpFS() throws IOException, URISyntaxException { - HftpFileSystem hfs = mock(HftpFileSystem.class); - - DelegationTokenSecretManager dtSecretManager = - NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()); - String renewer = "renewer"; - jConf.set(JTConfig.JT_USER_NAME,renewer); - DelegationTokenIdentifier dtId = - new DelegationTokenIdentifier(new Text("user"), new Text(renewer), null); - final Token t = - new Token(dtId, dtSecretManager); - - final URI uri = new URI("hftp://127.0.0.1:2222/file1"); - final String fs_addr = - SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT); - t.setService(new Text(fs_addr)); - - //when(hfs.getUri()).thenReturn(uri); - Mockito.doAnswer(new Answer(){ - @Override - public URI answer(InvocationOnMock invocation) - throws Throwable { - return uri; - }}).when(hfs).getUri(); - - //when(hfs.getDelegationToken()).thenReturn((Token) t); - Mockito.doAnswer(new Answer>(){ - @Override - public Token answer(InvocationOnMock invocation) - throws Throwable { - return t; - }}).when(hfs).getDelegationToken(renewer); - - //when(hfs.getDelegationTokens()).thenReturn((Token) t); - Mockito.doAnswer(new Answer>>(){ - @Override - public List> answer(InvocationOnMock invocation) - throws Throwable { - return Collections.singletonList(t); - }}).when(hfs).getDelegationTokens(renewer); - - //when(hfs.getCanonicalServiceName).thenReturn(fs_addr); - Mockito.doAnswer(new Answer(){ - @Override - public String answer(InvocationOnMock invocation) - throws Throwable { - return fs_addr; - }}).when(hfs).getCanonicalServiceName(); - - Credentials credentials = new Credentials(); - Path p = new Path(uri.toString()); - System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr + "; rn=" + renewer); - TokenCache.obtainTokensForNamenodesInternal(hfs, credentials, jConf); - - Collection> tns = credentials.getAllTokens(); - assertEquals("number of tokens is not 1", 1, tns.size()); - - boolean found = false; - for(Token tt: tns) { - System.out.println("token="+tt); - if(tt.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) && - tt.getService().equals(new Text(fs_addr))) { - found = true; - assertEquals("different token", tt, t); - } - assertTrue("didn't find token for " + p, found); - } - } - - /** - * verify _HOST substitution - * @throws IOException - */ - @Test - public void testGetJTPrincipal() throws IOException { - String serviceName = "jt/"; - String hostName = "foo"; - String domainName = "@BAR"; - Configuration conf = new Configuration(); - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME); - conf.set(JTConfig.JT_IPC_ADDRESS, hostName + ":8888"); - conf.set(JTConfig.JT_USER_NAME, serviceName + SecurityUtil.HOSTNAME_PATTERN - + domainName); - assertEquals("Failed to substitute HOSTNAME_PATTERN with hostName", - serviceName + hostName + domainName, Master.getMasterPrincipal(conf)); - } - - @Test - public void testGetTokensForViewFS() throws IOException, URISyntaxException { - Configuration conf = new Configuration(jConf); - FileSystem dfs = dfsCluster.getFileSystem(); - String serviceName = dfs.getCanonicalServiceName(); - - Path p1 = new Path("/mount1"); - Path p2 = new Path("/mount2"); - p1 = dfs.makeQualified(p1); - p2 = dfs.makeQualified(p2); - - conf.set("fs.viewfs.mounttable.default.link./dir1", p1.toString()); - conf.set("fs.viewfs.mounttable.default.link./dir2", p2.toString()); - Credentials credentials = new Credentials(); - Path lp1 = new Path("viewfs:///dir1"); - Path lp2 = new Path("viewfs:///dir2"); - Path[] paths = new Path[2]; - paths[0] = lp1; - paths[1] = lp2; - TokenCache.obtainTokensForNamenodesInternal(credentials, paths, conf); - - Collection> tns = - credentials.getAllTokens(); - assertEquals("number of tokens is not 1", 1, tns.size()); - - boolean found = false; - for (Token tt : tns) { - System.out.println("token=" + tt); - if (tt.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) - && tt.getService().equals(new Text(serviceName))) { - found = true; - } - assertTrue("didn't find token for [" + lp1 + ", " + lp2 + "]", found); - } - } - - @Test - public void testGetTokensForUriWithoutAuth() throws IOException { - FileSystem fs = dfsCluster.getFileSystem(); - HadoopArchives har = new HadoopArchives(jConf); - Path archivePath = new Path(fs.getHomeDirectory(), "tmp"); - String[] args = new String[6]; - args[0] = "-archiveName"; - args[1] = "foo1.har"; - args[2] = "-p"; - args[3] = fs.getHomeDirectory().toString(); - args[4] = "test"; - args[5] = archivePath.toString(); - try { - int ret = ToolRunner.run(har, args); - } catch (Exception e) { - fail("Could not create har file"); - } - Path finalPath = new Path(archivePath, "foo1.har"); - Path filePath = new Path(finalPath, "test"); - - Credentials credentials = new Credentials(); - TokenCache.obtainTokensForNamenodesInternal( - credentials, new Path [] {finalPath}, jConf); - } -}