MAPREDUCE-3716. Fixing YARN+MR to allow MR jobs to be able to use java.io.File.createTempFile to create temporary files as part of their tasks. Contributed by Jonathan Eagles.

svn merge --ignore-ancestry -c 1238136 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1238138 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-31 02:24:54 +00:00
parent fc432d7f33
commit 3861f9f50d
8 changed files with 139 additions and 111 deletions

View File

@ -566,6 +566,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3703. ResourceManager should provide node lists in JMX output. MAPREDUCE-3703. ResourceManager should provide node lists in JMX output.
(Eric Payne via mahadev) (Eric Payne via mahadev)
MAPREDUCE-3716. Fixing YARN+MR to allow MR jobs to be able to use
java.io.File.createTempFile to create temporary files as part of their
tasks. (Jonathan Eagles via vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -201,7 +202,8 @@ public class MapReduceChildJVM {
vargs.add(javaOptsSplit[i]); vargs.add(javaOptsSplit[i]);
} }
String childTmpDir = Environment.PWD.$() + Path.SEPARATOR + "tmp"; Path childTmpDir = new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + childTmpDir); vargs.add("-Djava.io.tmpdir=" + childTmpDir);
// Setup the log4j prop // Setup the log4j prop

View File

@ -17,10 +17,17 @@
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.*; import java.io.*;
import java.util.Iterator; import java.util.Iterator;
import junit.framework.TestCase; import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -29,20 +36,21 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Ignore; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
/** /**
* Class to test mapred task's * Class to test mapred task's
* - temp directory * - temp directory
* - child env * - child env
*/ */
@Ignore public class TestMiniMRChildTask {
public class TestMiniMRChildTask extends TestCase {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(TestMiniMRChildTask.class.getName()); LogFactory.getLog(TestMiniMRChildTask.class.getName());
@ -51,9 +59,23 @@ public class TestMiniMRChildTask extends TestCase {
private final static String MAP_OPTS_VAL = "-Xmx200m"; private final static String MAP_OPTS_VAL = "-Xmx200m";
private final static String REDUCE_OPTS_VAL = "-Xmx300m"; private final static String REDUCE_OPTS_VAL = "-Xmx300m";
private MiniMRCluster mr; private static MiniMRYarnCluster mr;
private MiniDFSCluster dfs; private static MiniDFSCluster dfs;
private FileSystem fileSys; private static FileSystem fileSys;
private static Configuration conf = new Configuration();
private static FileSystem localFs;
static {
try {
localFs = FileSystem.getLocal(conf);
} catch (IOException io) {
throw new RuntimeException("problem getting local fs", io);
}
}
private static Path TEST_ROOT_DIR = new Path("target",
TestMiniMRChildTask.class.getName() + "-tmpDir").makeQualified(localFs);
static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
/** /**
* Map class which checks whether temp directory exists * Map class which checks whether temp directory exists
@ -62,34 +84,26 @@ public class TestMiniMRChildTask extends TestCase {
* temp directory specified. * temp directory specified.
*/ */
public static class MapClass extends MapReduceBase public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> { implements Mapper<LongWritable, Text, Text, IntWritable> {
Path tmpDir; Path tmpDir;
FileSystem localFs; public void map (LongWritable key, Text value,
public void map (LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
Reporter reporter) throws IOException { if (localFs.exists(tmpDir)) {
String tmp = null; } else {
if (localFs.exists(tmpDir)) { fail("Temp directory " + tmpDir +" doesnt exist.");
tmp = tmpDir.makeQualified(localFs).toString(); }
File tmpFile = File.createTempFile("test", ".tmp");
assertEquals(tmp, new Path(System.getProperty("java.io.tmpdir")). }
makeQualified(localFs).toString()); public void configure(JobConf job) {
} else { tmpDir = new Path(System.getProperty("java.io.tmpdir"));
fail("Temp directory "+tmpDir +" doesnt exist."); try {
} localFs = FileSystem.getLocal(job);
File tmpFile = File.createTempFile("test", ".tmp"); } catch (IOException ioe) {
assertEquals(tmp, new Path(tmpFile.getParent()). ioe.printStackTrace();
makeQualified(localFs).toString()); fail("IOException in getting localFS");
} }
public void configure(JobConf job) { }
tmpDir = new Path(job.get(JobContext.TASK_TEMP_DIR, "./tmp"));
try {
localFs = FileSystem.getLocal(job);
} catch (IOException ioe) {
ioe.printStackTrace();
fail("IOException in getting localFS");
}
}
} }
// configure a job // configure a job
@ -136,7 +150,7 @@ public class TestMiniMRChildTask extends TestCase {
Path inDir, Path inDir,
Path outDir, Path outDir,
String input) String input)
throws IOException { throws IOException, InterruptedException, ClassNotFoundException {
configure(conf, inDir, outDir, input, configure(conf, inDir, outDir, input,
MapClass.class, IdentityReducer.class); MapClass.class, IdentityReducer.class);
@ -144,48 +158,13 @@ public class TestMiniMRChildTask extends TestCase {
// Launch job with default option for temp dir. // Launch job with default option for temp dir.
// i.e. temp dir is ./tmp // i.e. temp dir is ./tmp
JobClient.runJob(conf); Job job = new Job(conf);
outFs.delete(outDir, true); job.addFileToClassPath(APP_JAR);
job.setJarByClass(TestMiniMRChildTask.class);
final String DEFAULT_ABS_TMP_PATH = "/tmp"; job.setMaxMapAttempts(1); // speed up failures
final String DEFAULT_REL_TMP_PATH = "../temp"; job.waitForCompletion(true);
boolean succeeded = job.waitForCompletion(true);
String absoluteTempPath = null; assertTrue(succeeded);
String relativeTempPath = null;
for (String key : new String[] { "test.temp.dir", "test.tmp.dir" }) {
String p = conf.get(key);
if (p == null || p.isEmpty()) {
continue;
}
if (new Path(p).isAbsolute()) {
if (absoluteTempPath == null) {
absoluteTempPath = p;
}
} else {
if (relativeTempPath == null) {
relativeTempPath = p;
}
}
}
if (absoluteTempPath == null) {
absoluteTempPath = DEFAULT_ABS_TMP_PATH;
}
if (relativeTempPath == null) {
relativeTempPath = DEFAULT_REL_TMP_PATH;
}
// Launch job by giving relative path to temp dir.
LOG.info("Testing with relative temp dir = "+relativeTempPath);
conf.set("mapred.child.tmp", relativeTempPath);
JobClient.runJob(conf);
outFs.delete(outDir, true);
// Launch job by giving absolute path to temp dir
LOG.info("Testing with absolute temp dir = "+absoluteTempPath);
conf.set("mapred.child.tmp", absoluteTempPath);
JobClient.runJob(conf);
outFs.delete(outDir, true); outFs.delete(outDir, true);
} }
@ -311,20 +290,33 @@ public class TestMiniMRChildTask extends TestCase {
} }
@Override @BeforeClass
public void setUp() { public static void setup() throws IOException {
try { // create configuration, dfs, file system and mapred cluster
// create configuration, dfs, file system and mapred cluster dfs = new MiniDFSCluster(conf, 1, true, null);
dfs = new MiniDFSCluster(new Configuration(), 1, true, null); fileSys = dfs.getFileSystem();
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(2, fileSys.getUri().toString(), 1); if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
} catch (IOException ioe) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
tearDown(); + " not found. Not running test.");
return;
} }
if (mr == null) {
mr = new MiniMRYarnCluster(TestMiniMRChildTask.class.getName());
Configuration conf = new Configuration();
mr.init(conf);
mr.start();
}
// Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
// workaround the absent public discache.
localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
localFs.setPermission(APP_JAR, new FsPermission("700"));
} }
@Override @AfterClass
public void tearDown() { public static void tearDown() {
// close file system and shut down dfs and mapred cluster // close file system and shut down dfs and mapred cluster
try { try {
if (fileSys != null) { if (fileSys != null) {
@ -334,7 +326,8 @@ public class TestMiniMRChildTask extends TestCase {
dfs.shutdown(); dfs.shutdown();
} }
if (mr != null) { if (mr != null) {
mr.shutdown(); mr.stop();
mr = null;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.info("IO exception in closing file system)" ); LOG.info("IO exception in closing file system)" );
@ -351,9 +344,10 @@ public class TestMiniMRChildTask extends TestCase {
* the directory specified. We create a temp file and check if is is * the directory specified. We create a temp file and check if is is
* created in the directory specified. * created in the directory specified.
*/ */
@Test
public void testTaskTempDir(){ public void testTaskTempDir(){
try { try {
JobConf conf = mr.createJobConf(); JobConf conf = new JobConf(mr.getConfig());
// intialize input, output directories // intialize input, output directories
Path inDir = new Path("testing/wc/input"); Path inDir = new Path("testing/wc/input");
@ -375,9 +369,10 @@ public class TestMiniMRChildTask extends TestCase {
* - x=y (x can be a already existing env variable or a new variable) * - x=y (x can be a already existing env variable or a new variable)
* - x=$x:y (replace $x with the current value of x) * - x=$x:y (replace $x with the current value of x)
*/ */
public void testTaskEnv(){ public void testTaskEnv(){
try { try {
JobConf conf = mr.createJobConf(); JobConf conf = new JobConf(mr.getConfig());
// initialize input, output directories // initialize input, output directories
Path inDir = new Path("testing/wc/input1"); Path inDir = new Path("testing/wc/input1");
Path outDir = new Path("testing/wc/output1"); Path outDir = new Path("testing/wc/output1");
@ -399,7 +394,7 @@ public class TestMiniMRChildTask extends TestCase {
*/ */
public void testTaskOldEnv(){ public void testTaskOldEnv(){
try { try {
JobConf conf = mr.createJobConf(); JobConf conf = new JobConf(mr.getConfig());
// initialize input, output directories // initialize input, output directories
Path inDir = new Path("testing/wc/input1"); Path inDir = new Path("testing/wc/input1");
Path outDir = new Path("testing/wc/output1"); Path outDir = new Path("testing/wc/output1");
@ -414,7 +409,7 @@ public class TestMiniMRChildTask extends TestCase {
} }
void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs) void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs)
throws IOException { throws IOException, InterruptedException, ClassNotFoundException {
String input = "The input"; String input = "The input";
configure(conf, inDir, outDir, input, configure(conf, inDir, outDir, input,
EnvCheckMapper.class, EnvCheckReducer.class); EnvCheckMapper.class, EnvCheckReducer.class);
@ -445,8 +440,14 @@ public class TestMiniMRChildTask extends TestCase {
conf.set("path", System.getenv("PATH")); conf.set("path", System.getenv("PATH"));
conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts); conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts);
conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts); conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts);
RunningJob job = JobClient.runJob(conf);
assertTrue("The environment checker job failed.", job.isSuccessful()); Job job = new Job(conf);
job.addFileToClassPath(APP_JAR);
job.setJarByClass(TestMiniMRChildTask.class);
job.setMaxMapAttempts(1); // speed up failures
job.waitForCompletion(true);
boolean succeeded = job.waitForCompletion(true);
assertTrue("The environment checker job failed.", succeeded);
} }
} }

View File

@ -512,6 +512,9 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath"; + "application.classpath";
/** Container temp directory */
public static final String DEFAULT_CONTAINER_TEMP_DIR = "./tmp";
public YarnConfiguration() { public YarnConfiguration() {
super(); super();
} }

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@ -128,6 +129,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// Create the container log-dirs on all disks // Create the container log-dirs on all disks
createContainerLogDirs(appIdStr, containerIdStr, logDirs); createContainerLogDirs(appIdStr, containerIdStr, logDirs);
Path tmpDir = new Path(containerWorkDir,
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
lfs.mkdir(tmpDir, null, false);
// copy launch script to work dir // copy launch script to work dir
Path launchDst = Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);

View File

@ -314,6 +314,13 @@ char* get_app_log_directory(const char *log_root, const char* app_id) {
app_id); app_id);
} }
/**
* Get the tmp directory under the working directory
*/
char *get_tmp_directory(const char *work_dir) {
return concatenate("%s/%s", "tmp dir", 2, work_dir, TMP_DIR);
}
/** /**
* Ensure that the given path and all of the parent directories are created * Ensure that the given path and all of the parent directories are created
* with the desired permissions. * with the desired permissions.
@ -357,7 +364,7 @@ int mkdirs(const char* path, mode_t perm) {
* It creates the container work and log directories. * It creates the container work and log directories.
*/ */
static int create_container_directories(const char* user, const char *app_id, static int create_container_directories(const char* user, const char *app_id,
const char *container_id, char* const* local_dir, char* const* log_dir) { const char *container_id, char* const* local_dir, char* const* log_dir, const char *work_dir) {
// create dirs as 0750 // create dirs as 0750
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP; const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
if (app_id == NULL || container_id == NULL || user == NULL) { if (app_id == NULL || container_id == NULL || user == NULL) {
@ -409,6 +416,23 @@ static int create_container_directories(const char* user, const char *app_id,
} }
free(combined_name); free(combined_name);
} }
if (result != 0) {
return result;
}
result = -1;
// also make the tmp directory
char *tmp_dir = get_tmp_directory(work_dir);
if (tmp_dir == NULL) {
return -1;
}
if (mkdirs(tmp_dir, perms) == 0) {
result = 0;
}
free(tmp_dir);
return result; return result;
} }
@ -823,7 +847,7 @@ int launch_container_as_user(const char *user, const char *app_id,
} }
if (create_container_directories(user, app_id, container_id, local_dirs, if (create_container_directories(user, app_id, container_id, local_dirs,
log_dirs) != 0) { log_dirs, work_dir) != 0) {
fprintf(LOGFILE, "Could not create container dirs"); fprintf(LOGFILE, "Could not create container dirs");
goto cleanup; goto cleanup;
} }

View File

@ -64,6 +64,7 @@ enum errorcodes {
#define CREDENTIALS_FILENAME "container_tokens" #define CREDENTIALS_FILENAME "container_tokens"
#define MIN_USERID_KEY "min.user.id" #define MIN_USERID_KEY "min.user.id"
#define BANNED_USERS_KEY "banned.users" #define BANNED_USERS_KEY "banned.users"
#define TMP_DIR "tmp"
extern struct passwd *user_detail; extern struct passwd *user_detail;

View File

@ -433,18 +433,6 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.task.tmp.dir</name>
<value>./tmp</value>
<description> To set the value of tmp directory for map and reduce tasks.
If the value is an absolute path, it is directly assigned. Otherwise, it is
prepended with task's working directory. The java tasks are executed with
option -Djava.io.tmpdir='the absolute path of the tmp dir'. Pipes and
streaming are set with environment variable,
TMPDIR='the absolute path of the tmp dir'
</description>
</property>
<property> <property>
<name>mapreduce.map.log.level</name> <name>mapreduce.map.log.level</name>
<value>INFO</value> <value>INFO</value>