diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 60d92f3323a..4f975cc7d2c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -636,6 +636,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3762. Fixed default CapacityScheduler configs. (mahadev via acmurthy) + MAPREDUCE-3499. New MiniMR does not setup proxyuser configuration + correctly, thus tests using doAs do not work. (johnvijoe via tucu) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index 022590bd9ab..75d01600de0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -147,7 +147,7 @@ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } */ private boolean getDelegationTokenCalled = false; /* notes the renewer that will renew the delegation token */ - private Text dtRenewer = null; + private String dtRenewer = null; /* do we need a HS delegation token for this client */ static final String HS_DELEGATION_TOKEN_REQUIRED = "mapreduce.history.server.delegationtoken.required"; @@ -600,7 +600,7 @@ public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, if (getDelegationTokenCalled) { conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled); getDelegationTokenCalled = false; - conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer.toString()); + conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer); dtRenewer = null; } Job job = clientUgi.doAs(new PrivilegedExceptionAction () { @@ -1204,7 +1204,7 @@ public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() public Token getDelegationToken(final Text renewer) throws IOException, InterruptedException { getDelegationTokenCalled = true; - dtRenewer = renewer; + dtRenewer = renewer.toString(); return clientUgi.doAs(new PrivilegedExceptionAction>() { public Token run() throws IOException, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index da48e9c0249..3f02092d242 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -263,16 +263,20 @@ public long getTaskTrackerExpiryInterval() throws IOException, @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { - // JobClient will set this flag if getDelegationToken is called, if so, get - // the delegation tokens for the HistoryServer also. - if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, - DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { - Token hsDT = getDelegationTokenFromHS(clientCache. - getInitializedHSProxy(), new Text( - conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER))); - ts.addToken(hsDT.getService(), hsDT); - } + /* check if we have a hsproxy, if not, no need */ + MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); + if (hsProxy != null) { + // JobClient will set this flag if getDelegationToken is called, if so, get + // the delegation tokens for the HistoryServer also. + if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, + DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { + Token hsDT = getDelegationTokenFromHS(hsProxy, new Text( + conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER))); + ts.addToken(hsDT.getService(), hsDT); + } + } + // Upload only in security mode: TODO Path applicationTokensFile = new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java index 7ecfc671647..c1fa8e0fc16 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java @@ -53,7 +53,7 @@ public static MiniMRClientCluster create(Class caller, int noOfNMs, Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR); fs.copyFromLocalFile(appMasterJar, appJar); - fs.setPermission(appJar, new FsPermission("700")); + fs.setPermission(appJar, new FsPermission("744")); Job job = Job.getInstance(conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index 1120413eb7c..bb56bd8e8cd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -24,12 +24,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -66,7 +71,27 @@ public MiniMRYarnCluster(String testName, int noOfNMs) { public void init(Configuration conf) { conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), - "apps_staging_dir/${user.name}/").getAbsolutePath()); + "apps_staging_dir/").getAbsolutePath()); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); + + try { + Path stagingPath = FileContext.getFileContext(conf).makeQualified( + new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR))); + FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf); + if (fc.util().exists(stagingPath)) { + LOG.info(stagingPath + " exists! deleting..."); + fc.delete(stagingPath, true); + } + LOG.info("mkdir: " + stagingPath); + //mkdir the staging directory so that right permissions are set while running as proxy user + fc.mkdir(stagingPath, null, true); + //mkdir done directory as well + String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); + Path doneDirPath = fc.makeQualified(new Path(doneDir)); + fc.mkdir(doneDirPath, null, true); + } catch (IOException e) { + throw new YarnException("Could not create staging directory. ", e); + } conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of // which shuffle doesn't happen //configure the shuffle service in NM diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMiniMRProxyUser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMiniMRProxyUser.java new file mode 100644 index 00000000000..41939cdec68 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMiniMRProxyUser.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2; + +import junit.framework.TestCase; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.ProxyUsers; + +import java.net.InetAddress; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.security.PrivilegedExceptionAction; + +public class TestMiniMRProxyUser extends TestCase { + + private MiniDFSCluster dfsCluster = null; + private MiniMRCluster mrCluster = null; + + protected void setUp() throws Exception { + super.setUp(); + if (System.getProperty("hadoop.log.dir") == null) { + System.setProperty("hadoop.log.dir", "/tmp"); + } + int taskTrackers = 2; + int dataNodes = 2; + String proxyUser = System.getProperty("user.name"); + String proxyGroup = "g"; + StringBuilder sb = new StringBuilder(); + sb.append("127.0.0.1,localhost"); + for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) { + sb.append(",").append(i.getCanonicalHostName()); + } + + JobConf conf = new JobConf(); + conf.set("dfs.block.access.token.enable", "false"); + conf.set("dfs.permissions", "true"); + conf.set("hadoop.security.authentication", "simple"); + conf.set("hadoop.proxyuser." + proxyUser + ".hosts", sb.toString()); + conf.set("hadoop.proxyuser." + proxyUser + ".groups", proxyGroup); + + String[] userGroups = new String[]{proxyGroup}; + UserGroupInformation.createUserForTesting(proxyUser, userGroups); + UserGroupInformation.createUserForTesting("u1", userGroups); + UserGroupInformation.createUserForTesting("u2", new String[]{"gg"}); + + dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null); + FileSystem fileSystem = dfsCluster.getFileSystem(); + fileSystem.mkdirs(new Path("/tmp")); + fileSystem.mkdirs(new Path("/user")); + fileSystem.mkdirs(new Path("/hadoop/mapred/system")); + fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------")); + String nnURI = fileSystem.getUri().toString(); + int numDirs = 1; + String[] racks = null; + String[] hosts = null; + mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf); + ProxyUsers.refreshSuperUserGroupsConfiguration(conf); + } + + protected JobConf getJobConf() { + return mrCluster.createJobConf(); + } + + @Override + protected void tearDown() throws Exception { + if (mrCluster != null) { + mrCluster.shutdown(); + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + super.tearDown(); + } + + private void mrRun() throws Exception { + FileSystem fs = FileSystem.get(getJobConf()); + Path inputDir = new Path("input"); + fs.mkdirs(inputDir); + Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + writer.write("hello"); + writer.close(); + + Path outputDir = new Path("output", "output"); + + JobConf jobConf = new JobConf(getJobConf()); + jobConf.setInt("mapred.map.tasks", 1); + jobConf.setInt("mapred.map.max.attempts", 1); + jobConf.setInt("mapred.reduce.max.attempts", 1); + jobConf.set("mapred.input.dir", inputDir.toString()); + jobConf.set("mapred.output.dir", outputDir.toString()); + + JobClient jobClient = new JobClient(jobConf); + RunningJob runJob = jobClient.submitJob(jobConf); + runJob.waitForCompletion(); + assertTrue(runJob.isComplete()); + assertTrue(runJob.isSuccessful()); + } + + public void __testCurrentUser() throws Exception { + mrRun(); + } + + public void testValidProxyUser() throws Exception { + UserGroupInformation ugi = UserGroupInformation.createProxyUser("u1", UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + mrRun(); + return null; + } + + + }); + } + + public void ___testInvalidProxyUser() throws Exception { + UserGroupInformation ugi = UserGroupInformation.createProxyUser("u2", UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + try { + mrRun(); + fail(); + } + catch (RemoteException ex) { + //nop + } + catch (Exception ex) { + fail(); + } + return null; + } + }); + } +} +