MAPREDUCE-3499. New MiniMR does not setup proxyuser configuration correctly, thus tests using doAs do not work. (johnvijoe via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1239207 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-02-01 17:03:27 +00:00
parent f49ba355e5
commit e3d555a656
6 changed files with 209 additions and 14 deletions

View File

@ -636,6 +636,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3762. Fixed default CapacityScheduler configs. (mahadev via
acmurthy)
MAPREDUCE-3499. New MiniMR does not setup proxyuser configuration
correctly, thus tests using doAs do not work. (johnvijoe via tucu)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -147,7 +147,7 @@ public class JobClient extends CLI {
*/
private boolean getDelegationTokenCalled = false;
/* notes the renewer that will renew the delegation token */
private Text dtRenewer = null;
private String dtRenewer = null;
/* do we need a HS delegation token for this client */
static final String HS_DELEGATION_TOKEN_REQUIRED
= "mapreduce.history.server.delegationtoken.required";
@ -600,7 +600,7 @@ public class JobClient extends CLI {
if (getDelegationTokenCalled) {
conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
getDelegationTokenCalled = false;
conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer.toString());
conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer);
dtRenewer = null;
}
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
@ -1204,7 +1204,7 @@ public class JobClient extends CLI {
public Token<DelegationTokenIdentifier>
getDelegationToken(final Text renewer) throws IOException, InterruptedException {
getDelegationTokenCalled = true;
dtRenewer = renewer;
dtRenewer = renewer.toString();
return clientUgi.doAs(new
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
public Token<DelegationTokenIdentifier> run() throws IOException,

View File

@ -263,14 +263,18 @@ public class YARNRunner implements ClientProtocol {
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
// JobClient will set this flag if getDelegationToken is called, if so, get
// the delegation tokens for the HistoryServer also.
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
Token hsDT = getDelegationTokenFromHS(clientCache.
getInitializedHSProxy(), new Text(
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
ts.addToken(hsDT.getService(), hsDT);
/* check if we have a hsproxy, if not, no need */
MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
if (hsProxy != null) {
// JobClient will set this flag if getDelegationToken is called, if so, get
// the delegation tokens for the HistoryServer also.
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
Token hsDT = getDelegationTokenFromHS(hsProxy, new Text(
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
ts.addToken(hsDT.getService(), hsDT);
}
}
// Upload only in security mode: TODO

View File

@ -53,7 +53,7 @@ public class MiniMRClientClusterFactory {
Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
fs.copyFromLocalFile(appMasterJar, appJar);
fs.setPermission(appJar, new FsPermission("700"));
fs.setPermission(appJar, new FsPermission("744"));
Job job = Job.getInstance(conf);

View File

@ -24,12 +24,17 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -66,7 +71,27 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
public void init(Configuration conf) {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir/${user.name}/").getAbsolutePath());
"apps_staging_dir/").getAbsolutePath());
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
//mkdir the staging directory so that right permissions are set while running as proxy user
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new YarnException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
//configure the shuffle service in NM

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2;
import junit.framework.TestCase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import java.net.InetAddress;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.security.PrivilegedExceptionAction;
public class TestMiniMRProxyUser extends TestCase {
private MiniDFSCluster dfsCluster = null;
private MiniMRCluster mrCluster = null;
protected void setUp() throws Exception {
super.setUp();
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", "/tmp");
}
int taskTrackers = 2;
int dataNodes = 2;
String proxyUser = System.getProperty("user.name");
String proxyGroup = "g";
StringBuilder sb = new StringBuilder();
sb.append("127.0.0.1,localhost");
for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
sb.append(",").append(i.getCanonicalHostName());
}
JobConf conf = new JobConf();
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
conf.set("hadoop.proxyuser." + proxyUser + ".hosts", sb.toString());
conf.set("hadoop.proxyuser." + proxyUser + ".groups", proxyGroup);
String[] userGroups = new String[]{proxyGroup};
UserGroupInformation.createUserForTesting(proxyUser, userGroups);
UserGroupInformation.createUserForTesting("u1", userGroups);
UserGroupInformation.createUserForTesting("u2", new String[]{"gg"});
dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
FileSystem fileSystem = dfsCluster.getFileSystem();
fileSystem.mkdirs(new Path("/tmp"));
fileSystem.mkdirs(new Path("/user"));
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
String nnURI = fileSystem.getUri().toString();
int numDirs = 1;
String[] racks = null;
String[] hosts = null;
mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
protected JobConf getJobConf() {
return mrCluster.createJobConf();
}
@Override
protected void tearDown() throws Exception {
if (mrCluster != null) {
mrCluster.shutdown();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
super.tearDown();
}
private void mrRun() throws Exception {
FileSystem fs = FileSystem.get(getJobConf());
Path inputDir = new Path("input");
fs.mkdirs(inputDir);
Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
writer.write("hello");
writer.close();
Path outputDir = new Path("output", "output");
JobConf jobConf = new JobConf(getJobConf());
jobConf.setInt("mapred.map.tasks", 1);
jobConf.setInt("mapred.map.max.attempts", 1);
jobConf.setInt("mapred.reduce.max.attempts", 1);
jobConf.set("mapred.input.dir", inputDir.toString());
jobConf.set("mapred.output.dir", outputDir.toString());
JobClient jobClient = new JobClient(jobConf);
RunningJob runJob = jobClient.submitJob(jobConf);
runJob.waitForCompletion();
assertTrue(runJob.isComplete());
assertTrue(runJob.isSuccessful());
}
public void __testCurrentUser() throws Exception {
mrRun();
}
public void testValidProxyUser() throws Exception {
UserGroupInformation ugi = UserGroupInformation.createProxyUser("u1", UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
mrRun();
return null;
}
});
}
public void ___testInvalidProxyUser() throws Exception {
UserGroupInformation ugi = UserGroupInformation.createProxyUser("u2", UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
try {
mrRun();
fail();
}
catch (RemoteException ex) {
//nop
}
catch (Exception ex) {
fail();
}
return null;
}
});
}
}