Merge -r 1239206:1239207 from trunk to branch. FIXES: MAPREDUCE-3499
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1239210 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
999708d065
commit
239604718b
|
@ -591,6 +591,9 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3762. Fixed default CapacityScheduler configs. (mahadev via
|
MAPREDUCE-3762. Fixed default CapacityScheduler configs. (mahadev via
|
||||||
acmurthy)
|
acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-3499. New MiniMR does not setup proxyuser configuration
|
||||||
|
correctly, thus tests using doAs do not work. (johnvijoe via tucu)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -147,7 +147,7 @@ public class JobClient extends CLI {
|
||||||
*/
|
*/
|
||||||
private boolean getDelegationTokenCalled = false;
|
private boolean getDelegationTokenCalled = false;
|
||||||
/* notes the renewer that will renew the delegation token */
|
/* notes the renewer that will renew the delegation token */
|
||||||
private Text dtRenewer = null;
|
private String dtRenewer = null;
|
||||||
/* do we need a HS delegation token for this client */
|
/* do we need a HS delegation token for this client */
|
||||||
static final String HS_DELEGATION_TOKEN_REQUIRED
|
static final String HS_DELEGATION_TOKEN_REQUIRED
|
||||||
= "mapreduce.history.server.delegationtoken.required";
|
= "mapreduce.history.server.delegationtoken.required";
|
||||||
|
@ -600,7 +600,7 @@ public class JobClient extends CLI {
|
||||||
if (getDelegationTokenCalled) {
|
if (getDelegationTokenCalled) {
|
||||||
conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
|
conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
|
||||||
getDelegationTokenCalled = false;
|
getDelegationTokenCalled = false;
|
||||||
conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer.toString());
|
conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer);
|
||||||
dtRenewer = null;
|
dtRenewer = null;
|
||||||
}
|
}
|
||||||
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
|
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
|
||||||
|
@ -1202,7 +1202,7 @@ public class JobClient extends CLI {
|
||||||
public Token<DelegationTokenIdentifier>
|
public Token<DelegationTokenIdentifier>
|
||||||
getDelegationToken(final Text renewer) throws IOException, InterruptedException {
|
getDelegationToken(final Text renewer) throws IOException, InterruptedException {
|
||||||
getDelegationTokenCalled = true;
|
getDelegationTokenCalled = true;
|
||||||
dtRenewer = renewer;
|
dtRenewer = renewer.toString();
|
||||||
return clientUgi.doAs(new
|
return clientUgi.doAs(new
|
||||||
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
|
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
|
||||||
public Token<DelegationTokenIdentifier> run() throws IOException,
|
public Token<DelegationTokenIdentifier> run() throws IOException,
|
||||||
|
|
|
@ -263,15 +263,19 @@ public class YARNRunner implements ClientProtocol {
|
||||||
@Override
|
@Override
|
||||||
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
|
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
/* check if we have a hsproxy, if not, no need */
|
||||||
|
MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
|
||||||
|
if (hsProxy != null) {
|
||||||
// JobClient will set this flag if getDelegationToken is called, if so, get
|
// JobClient will set this flag if getDelegationToken is called, if so, get
|
||||||
// the delegation tokens for the HistoryServer also.
|
// the delegation tokens for the HistoryServer also.
|
||||||
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
|
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
|
||||||
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
|
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
|
||||||
Token hsDT = getDelegationTokenFromHS(clientCache.
|
Token hsDT = getDelegationTokenFromHS(hsProxy, new Text(
|
||||||
getInitializedHSProxy(), new Text(
|
|
||||||
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
|
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
|
||||||
ts.addToken(hsDT.getService(), hsDT);
|
ts.addToken(hsDT.getService(), hsDT);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Upload only in security mode: TODO
|
// Upload only in security mode: TODO
|
||||||
Path applicationTokensFile =
|
Path applicationTokensFile =
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class MiniMRClientClusterFactory {
|
||||||
Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
|
Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
|
||||||
|
|
||||||
fs.copyFromLocalFile(appMasterJar, appJar);
|
fs.copyFromLocalFile(appMasterJar, appJar);
|
||||||
fs.setPermission(appJar, new FsPermission("700"));
|
fs.setPermission(appJar, new FsPermission("744"));
|
||||||
|
|
||||||
Job job = Job.getInstance(conf);
|
Job job = Job.getInstance(conf);
|
||||||
|
|
||||||
|
|
|
@ -24,12 +24,17 @@ import java.io.IOException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.mapred.LocalContainerLauncher;
|
import org.apache.hadoop.mapred.LocalContainerLauncher;
|
||||||
import org.apache.hadoop.mapred.ShuffleHandler;
|
import org.apache.hadoop.mapred.ShuffleHandler;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
|
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
import org.apache.hadoop.util.JarFinder;
|
import org.apache.hadoop.util.JarFinder;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -66,7 +71,27 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
||||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
|
||||||
"apps_staging_dir/${user.name}/").getAbsolutePath());
|
"apps_staging_dir/").getAbsolutePath());
|
||||||
|
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
|
||||||
|
|
||||||
|
try {
|
||||||
|
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
|
||||||
|
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
|
||||||
|
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
|
||||||
|
if (fc.util().exists(stagingPath)) {
|
||||||
|
LOG.info(stagingPath + " exists! deleting...");
|
||||||
|
fc.delete(stagingPath, true);
|
||||||
|
}
|
||||||
|
LOG.info("mkdir: " + stagingPath);
|
||||||
|
//mkdir the staging directory so that right permissions are set while running as proxy user
|
||||||
|
fc.mkdir(stagingPath, null, true);
|
||||||
|
//mkdir done directory as well
|
||||||
|
String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
|
||||||
|
Path doneDirPath = fc.makeQualified(new Path(doneDir));
|
||||||
|
fc.mkdir(doneDirPath, null, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new YarnException("Could not create staging directory. ", e);
|
||||||
|
}
|
||||||
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
|
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
|
||||||
// which shuffle doesn't happen
|
// which shuffle doesn't happen
|
||||||
//configure the shuffle service in NM
|
//configure the shuffle service in NM
|
||||||
|
|
|
@ -0,0 +1,163 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.v2;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.mapred.JobClient;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||||
|
import org.apache.hadoop.mapred.RunningJob;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.io.Writer;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
public class TestMiniMRProxyUser extends TestCase {
|
||||||
|
|
||||||
|
private MiniDFSCluster dfsCluster = null;
|
||||||
|
private MiniMRCluster mrCluster = null;
|
||||||
|
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
if (System.getProperty("hadoop.log.dir") == null) {
|
||||||
|
System.setProperty("hadoop.log.dir", "/tmp");
|
||||||
|
}
|
||||||
|
int taskTrackers = 2;
|
||||||
|
int dataNodes = 2;
|
||||||
|
String proxyUser = System.getProperty("user.name");
|
||||||
|
String proxyGroup = "g";
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("127.0.0.1,localhost");
|
||||||
|
for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
|
||||||
|
sb.append(",").append(i.getCanonicalHostName());
|
||||||
|
}
|
||||||
|
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
conf.set("dfs.block.access.token.enable", "false");
|
||||||
|
conf.set("dfs.permissions", "true");
|
||||||
|
conf.set("hadoop.security.authentication", "simple");
|
||||||
|
conf.set("hadoop.proxyuser." + proxyUser + ".hosts", sb.toString());
|
||||||
|
conf.set("hadoop.proxyuser." + proxyUser + ".groups", proxyGroup);
|
||||||
|
|
||||||
|
String[] userGroups = new String[]{proxyGroup};
|
||||||
|
UserGroupInformation.createUserForTesting(proxyUser, userGroups);
|
||||||
|
UserGroupInformation.createUserForTesting("u1", userGroups);
|
||||||
|
UserGroupInformation.createUserForTesting("u2", new String[]{"gg"});
|
||||||
|
|
||||||
|
dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
|
||||||
|
FileSystem fileSystem = dfsCluster.getFileSystem();
|
||||||
|
fileSystem.mkdirs(new Path("/tmp"));
|
||||||
|
fileSystem.mkdirs(new Path("/user"));
|
||||||
|
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
|
||||||
|
fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
|
||||||
|
fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
|
||||||
|
fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
|
||||||
|
String nnURI = fileSystem.getUri().toString();
|
||||||
|
int numDirs = 1;
|
||||||
|
String[] racks = null;
|
||||||
|
String[] hosts = null;
|
||||||
|
mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf);
|
||||||
|
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected JobConf getJobConf() {
|
||||||
|
return mrCluster.createJobConf();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
if (mrCluster != null) {
|
||||||
|
mrCluster.shutdown();
|
||||||
|
}
|
||||||
|
if (dfsCluster != null) {
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mrRun() throws Exception {
|
||||||
|
FileSystem fs = FileSystem.get(getJobConf());
|
||||||
|
Path inputDir = new Path("input");
|
||||||
|
fs.mkdirs(inputDir);
|
||||||
|
Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
|
||||||
|
writer.write("hello");
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
Path outputDir = new Path("output", "output");
|
||||||
|
|
||||||
|
JobConf jobConf = new JobConf(getJobConf());
|
||||||
|
jobConf.setInt("mapred.map.tasks", 1);
|
||||||
|
jobConf.setInt("mapred.map.max.attempts", 1);
|
||||||
|
jobConf.setInt("mapred.reduce.max.attempts", 1);
|
||||||
|
jobConf.set("mapred.input.dir", inputDir.toString());
|
||||||
|
jobConf.set("mapred.output.dir", outputDir.toString());
|
||||||
|
|
||||||
|
JobClient jobClient = new JobClient(jobConf);
|
||||||
|
RunningJob runJob = jobClient.submitJob(jobConf);
|
||||||
|
runJob.waitForCompletion();
|
||||||
|
assertTrue(runJob.isComplete());
|
||||||
|
assertTrue(runJob.isSuccessful());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void __testCurrentUser() throws Exception {
|
||||||
|
mrRun();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidProxyUser() throws Exception {
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.createProxyUser("u1", UserGroupInformation.getLoginUser());
|
||||||
|
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
public Void run() throws Exception {
|
||||||
|
mrRun();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ___testInvalidProxyUser() throws Exception {
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.createProxyUser("u2", UserGroupInformation.getLoginUser());
|
||||||
|
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
public Void run() throws Exception {
|
||||||
|
try {
|
||||||
|
mrRun();
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
catch (RemoteException ex) {
|
||||||
|
//nop
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue