MAPREDUCE-3716. Fixing YARN+MR to allow MR jobs to be able to use java.io.File.createTempFile to create temporary files as part of their tasks. Contributed by Jonathan Eagles.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1238136 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-31 02:23:38 +00:00
parent 7e4725db41
commit 76b653a367
8 changed files with 139 additions and 111 deletions

View File

@ -611,6 +611,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3703. ResourceManager should provide node lists in JMX output.
(Eric Payne via mahadev)
MAPREDUCE-3716. Fixing YARN+MR to allow MR jobs to be able to use
java.io.File.createTempFile to create temporary files as part of their
tasks. (Jonathan Eagles via vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
@SuppressWarnings("deprecation")
@ -201,7 +202,8 @@ public class MapReduceChildJVM {
vargs.add(javaOptsSplit[i]);
}
String childTmpDir = Environment.PWD.$() + Path.SEPARATOR + "tmp";
Path childTmpDir = new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
// Setup the log4j prop

View File

@ -17,10 +17,17 @@
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.*;
import java.util.Iterator;
import junit.framework.TestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -29,20 +36,21 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Ignore;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
/**
* Class to test mapred task's
* - temp directory
* - child env
*/
@Ignore
public class TestMiniMRChildTask extends TestCase {
public class TestMiniMRChildTask {
private static final Log LOG =
LogFactory.getLog(TestMiniMRChildTask.class.getName());
@ -51,10 +59,24 @@ public class TestMiniMRChildTask extends TestCase {
private final static String MAP_OPTS_VAL = "-Xmx200m";
private final static String REDUCE_OPTS_VAL = "-Xmx300m";
private MiniMRCluster mr;
private MiniDFSCluster dfs;
private FileSystem fileSys;
private static MiniMRYarnCluster mr;
private static MiniDFSCluster dfs;
private static FileSystem fileSys;
private static Configuration conf = new Configuration();
private static FileSystem localFs;
static {
try {
localFs = FileSystem.getLocal(conf);
} catch (IOException io) {
throw new RuntimeException("problem getting local fs", io);
}
}
private static Path TEST_ROOT_DIR = new Path("target",
TestMiniMRChildTask.class.getName() + "-tmpDir").makeQualified(localFs);
static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
/**
* Map class which checks whether temp directory exists
* and check the value of java.io.tmpdir
@ -62,34 +84,26 @@ public class TestMiniMRChildTask extends TestCase {
* temp directory specified.
*/
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
Path tmpDir;
FileSystem localFs;
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String tmp = null;
if (localFs.exists(tmpDir)) {
tmp = tmpDir.makeQualified(localFs).toString();
assertEquals(tmp, new Path(System.getProperty("java.io.tmpdir")).
makeQualified(localFs).toString());
} else {
fail("Temp directory "+tmpDir +" doesnt exist.");
}
File tmpFile = File.createTempFile("test", ".tmp");
assertEquals(tmp, new Path(tmpFile.getParent()).
makeQualified(localFs).toString());
}
public void configure(JobConf job) {
tmpDir = new Path(job.get(JobContext.TASK_TEMP_DIR, "./tmp"));
try {
localFs = FileSystem.getLocal(job);
} catch (IOException ioe) {
ioe.printStackTrace();
fail("IOException in getting localFS");
}
}
implements Mapper<LongWritable, Text, Text, IntWritable> {
Path tmpDir;
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
if (localFs.exists(tmpDir)) {
} else {
fail("Temp directory " + tmpDir +" doesnt exist.");
}
File tmpFile = File.createTempFile("test", ".tmp");
}
public void configure(JobConf job) {
tmpDir = new Path(System.getProperty("java.io.tmpdir"));
try {
localFs = FileSystem.getLocal(job);
} catch (IOException ioe) {
ioe.printStackTrace();
fail("IOException in getting localFS");
}
}
}
// configure a job
@ -136,7 +150,7 @@ public class TestMiniMRChildTask extends TestCase {
Path inDir,
Path outDir,
String input)
throws IOException {
throws IOException, InterruptedException, ClassNotFoundException {
configure(conf, inDir, outDir, input,
MapClass.class, IdentityReducer.class);
@ -144,48 +158,13 @@ public class TestMiniMRChildTask extends TestCase {
// Launch job with default option for temp dir.
// i.e. temp dir is ./tmp
JobClient.runJob(conf);
outFs.delete(outDir, true);
final String DEFAULT_ABS_TMP_PATH = "/tmp";
final String DEFAULT_REL_TMP_PATH = "../temp";
String absoluteTempPath = null;
String relativeTempPath = null;
for (String key : new String[] { "test.temp.dir", "test.tmp.dir" }) {
String p = conf.get(key);
if (p == null || p.isEmpty()) {
continue;
}
if (new Path(p).isAbsolute()) {
if (absoluteTempPath == null) {
absoluteTempPath = p;
}
} else {
if (relativeTempPath == null) {
relativeTempPath = p;
}
}
}
if (absoluteTempPath == null) {
absoluteTempPath = DEFAULT_ABS_TMP_PATH;
}
if (relativeTempPath == null) {
relativeTempPath = DEFAULT_REL_TMP_PATH;
}
// Launch job by giving relative path to temp dir.
LOG.info("Testing with relative temp dir = "+relativeTempPath);
conf.set("mapred.child.tmp", relativeTempPath);
JobClient.runJob(conf);
outFs.delete(outDir, true);
// Launch job by giving absolute path to temp dir
LOG.info("Testing with absolute temp dir = "+absoluteTempPath);
conf.set("mapred.child.tmp", absoluteTempPath);
JobClient.runJob(conf);
Job job = new Job(conf);
job.addFileToClassPath(APP_JAR);
job.setJarByClass(TestMiniMRChildTask.class);
job.setMaxMapAttempts(1); // speed up failures
job.waitForCompletion(true);
boolean succeeded = job.waitForCompletion(true);
assertTrue(succeeded);
outFs.delete(outDir, true);
}
@ -311,20 +290,33 @@ public class TestMiniMRChildTask extends TestCase {
}
@Override
public void setUp() {
try {
// create configuration, dfs, file system and mapred cluster
dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(2, fileSys.getUri().toString(), 1);
} catch (IOException ioe) {
tearDown();
@BeforeClass
public static void setup() throws IOException {
// create configuration, dfs, file system and mapred cluster
dfs = new MiniDFSCluster(conf, 1, true, null);
fileSys = dfs.getFileSystem();
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
if (mr == null) {
mr = new MiniMRYarnCluster(TestMiniMRChildTask.class.getName());
Configuration conf = new Configuration();
mr.init(conf);
mr.start();
}
// Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
// workaround the absent public discache.
localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
localFs.setPermission(APP_JAR, new FsPermission("700"));
}
@Override
public void tearDown() {
@AfterClass
public static void tearDown() {
// close file system and shut down dfs and mapred cluster
try {
if (fileSys != null) {
@ -334,7 +326,8 @@ public class TestMiniMRChildTask extends TestCase {
dfs.shutdown();
}
if (mr != null) {
mr.shutdown();
mr.stop();
mr = null;
}
} catch (IOException ioe) {
LOG.info("IO exception in closing file system)" );
@ -351,9 +344,10 @@ public class TestMiniMRChildTask extends TestCase {
* the directory specified. We create a temp file and check if is is
* created in the directory specified.
*/
@Test
public void testTaskTempDir(){
try {
JobConf conf = mr.createJobConf();
JobConf conf = new JobConf(mr.getConfig());
// intialize input, output directories
Path inDir = new Path("testing/wc/input");
@ -375,9 +369,10 @@ public class TestMiniMRChildTask extends TestCase {
* - x=y (x can be a already existing env variable or a new variable)
* - x=$x:y (replace $x with the current value of x)
*/
public void testTaskEnv(){
try {
JobConf conf = mr.createJobConf();
JobConf conf = new JobConf(mr.getConfig());
// initialize input, output directories
Path inDir = new Path("testing/wc/input1");
Path outDir = new Path("testing/wc/output1");
@ -399,7 +394,7 @@ public class TestMiniMRChildTask extends TestCase {
*/
public void testTaskOldEnv(){
try {
JobConf conf = mr.createJobConf();
JobConf conf = new JobConf(mr.getConfig());
// initialize input, output directories
Path inDir = new Path("testing/wc/input1");
Path outDir = new Path("testing/wc/output1");
@ -414,7 +409,7 @@ public class TestMiniMRChildTask extends TestCase {
}
void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs)
throws IOException {
throws IOException, InterruptedException, ClassNotFoundException {
String input = "The input";
configure(conf, inDir, outDir, input,
EnvCheckMapper.class, EnvCheckReducer.class);
@ -445,8 +440,14 @@ public class TestMiniMRChildTask extends TestCase {
conf.set("path", System.getenv("PATH"));
conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts);
conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts);
RunningJob job = JobClient.runJob(conf);
assertTrue("The environment checker job failed.", job.isSuccessful());
Job job = new Job(conf);
job.addFileToClassPath(APP_JAR);
job.setJarByClass(TestMiniMRChildTask.class);
job.setMaxMapAttempts(1); // speed up failures
job.waitForCompletion(true);
boolean succeeded = job.waitForCompletion(true);
assertTrue("The environment checker job failed.", succeeded);
}
}

View File

@ -512,6 +512,9 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath";
/** Container temp directory */
public static final String DEFAULT_CONTAINER_TEMP_DIR = "./tmp";
public YarnConfiguration() {
super();
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@ -128,6 +129,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// Create the container log-dirs on all disks
createContainerLogDirs(appIdStr, containerIdStr, logDirs);
Path tmpDir = new Path(containerWorkDir,
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
lfs.mkdir(tmpDir, null, false);
// copy launch script to work dir
Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);

View File

@ -314,6 +314,13 @@ char* get_app_log_directory(const char *log_root, const char* app_id) {
app_id);
}
/**
* Get the tmp directory under the working directory
*/
char *get_tmp_directory(const char *work_dir) {
return concatenate("%s/%s", "tmp dir", 2, work_dir, TMP_DIR);
}
/**
* Ensure that the given path and all of the parent directories are created
* with the desired permissions.
@ -357,7 +364,7 @@ int mkdirs(const char* path, mode_t perm) {
* It creates the container work and log directories.
*/
static int create_container_directories(const char* user, const char *app_id,
const char *container_id, char* const* local_dir, char* const* log_dir) {
const char *container_id, char* const* local_dir, char* const* log_dir, const char *work_dir) {
// create dirs as 0750
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
if (app_id == NULL || container_id == NULL || user == NULL) {
@ -409,6 +416,23 @@ static int create_container_directories(const char* user, const char *app_id,
}
free(combined_name);
}
if (result != 0) {
return result;
}
result = -1;
// also make the tmp directory
char *tmp_dir = get_tmp_directory(work_dir);
if (tmp_dir == NULL) {
return -1;
}
if (mkdirs(tmp_dir, perms) == 0) {
result = 0;
}
free(tmp_dir);
return result;
}
@ -823,7 +847,7 @@ int launch_container_as_user(const char *user, const char *app_id,
}
if (create_container_directories(user, app_id, container_id, local_dirs,
log_dirs) != 0) {
log_dirs, work_dir) != 0) {
fprintf(LOGFILE, "Could not create container dirs");
goto cleanup;
}

View File

@ -64,6 +64,7 @@ enum errorcodes {
#define CREDENTIALS_FILENAME "container_tokens"
#define MIN_USERID_KEY "min.user.id"
#define BANNED_USERS_KEY "banned.users"
#define TMP_DIR "tmp"
extern struct passwd *user_detail;

View File

@ -433,18 +433,6 @@
</description>
</property>
<property>
<name>mapreduce.task.tmp.dir</name>
<value>./tmp</value>
<description> To set the value of tmp directory for map and reduce tasks.
If the value is an absolute path, it is directly assigned. Otherwise, it is
prepended with task's working directory. The java tasks are executed with
option -Djava.io.tmpdir='the absolute path of the tmp dir'. Pipes and
streaming are set with environment variable,
TMPDIR='the absolute path of the tmp dir'
</description>
</property>
<property>
<name>mapreduce.map.log.level</name>
<value>INFO</value>