Merge -c 1181310 from trunk to branch-0.23 to fix MAPREDUCE-3158.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1181311 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-11 00:27:39 +00:00
parent 446be91d0d
commit 8d0daf8843
16 changed files with 44 additions and 8 deletions

View File

@ -1527,6 +1527,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3154. Fix JobSubmitter to check for output specs before copying MAPREDUCE-3154. Fix JobSubmitter to check for output specs before copying
job submission files to fail fast. (Abhijit Suresh Shingate via acmurthy) job submission files to fail fast. (Abhijit Suresh Shingate via acmurthy)
MAPREDUCE-3158. Fix test failures in MRv1 due to default framework being
set to yarn. (Hitesh Shah)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -314,6 +314,8 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
ReduceTask reduce = (ReduceTask)task; ReduceTask reduce = (ReduceTask)task;
// a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner: // a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
// set framework name to local to make task local
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
reduce.run(conf, umbilical); reduce.run(conf, umbilical);

View File

@ -342,11 +342,9 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
RawKeyValueIterator rIter = null; RawKeyValueIterator rIter = null;
boolean isLocal = false; boolean isLocal = false;
// local iff framework == classic && master address == local // local iff framework == local
String framework = job.get(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); String framework = job.get(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
if (framework.equals(MRConfig.CLASSIC_FRAMEWORK_NAME)) { isLocal = framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME);
isLocal = "local".equals(job.get(MRConfig.MASTER_ADDRESS, "local"));
}
if (!isLocal) { if (!isLocal) {
Class combinerClass = conf.getCombinerClass(); Class combinerClass = conf.getCombinerClass();

View File

@ -67,6 +67,7 @@ public interface MRConfig {
public static final String FRAMEWORK_NAME = "mapreduce.framework.name"; public static final String FRAMEWORK_NAME = "mapreduce.framework.name";
public static final String CLASSIC_FRAMEWORK_NAME = "classic"; public static final String CLASSIC_FRAMEWORK_NAME = "classic";
public static final String YARN_FRAMEWORK_NAME = "yarn"; public static final String YARN_FRAMEWORK_NAME = "yarn";
public static final String LOCAL_FRAMEWORK_NAME = "local";
public static final String TASK_LOCAL_OUTPUT_CLASS = public static final String TASK_LOCAL_OUTPUT_CLASS =
"mapreduce.task.local.output.class"; "mapreduce.task.local.output.class";

View File

@ -135,7 +135,7 @@ public void testSleepJob() throws IOException, InterruptedException,
} }
Configuration sleepConf = new Configuration(mrCluster.getConfig()); Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied iff framework == classic and master_address == local // set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
SleepJob sleepJob = new SleepJob(); SleepJob sleepJob = new SleepJob();

View File

@ -34,7 +34,7 @@ public class LocalClientProtocolProvider extends ClientProtocolProvider {
@Override @Override
public ClientProtocol create(Configuration conf) throws IOException { public ClientProtocol create(Configuration conf) throws IOException {
String framework = conf.get(MRConfig.FRAMEWORK_NAME); String framework = conf.get(MRConfig.FRAMEWORK_NAME);
if (framework != null && !framework.equals("local")) { if (framework != null && !framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME)) {
return null; return null;
} }
String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local"); String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -200,7 +201,14 @@ protected FileSystem getFileSystem() {
* @return configuration that works on the testcase Hadoop instance * @return configuration that works on the testcase Hadoop instance
*/ */
protected JobConf createJobConf() { protected JobConf createJobConf() {
return (localMR) ? new JobConf() : mrCluster.createJobConf(); if (localMR) {
JobConf conf = new JobConf();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
return conf;
}
else {
return mrCluster.createJobConf();
}
} }
} }

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat; import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat;
import org.apache.hadoop.mapreduce.MRConfig;
import junit.framework.TestCase; import junit.framework.TestCase;
import java.io.*; import java.io.*;
@ -120,6 +121,7 @@ public void configure(JobConf conf) throws IOException {
conf.setOutputValueClass(IntWritable.class); conf.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(conf, OUTPUT_DIR); FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.setMapperClass(Map.class); conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class); conf.setReducerClass(Reduce.class);
conf.setNumMapTasks(1); conf.setNumMapTasks(1);

View File

@ -19,6 +19,8 @@
import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.MRConfig;
import junit.framework.TestCase; import junit.framework.TestCase;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
@ -305,6 +307,7 @@ public void configure() throws Exception {
conf.setMapOutputValueClass(IntWritable.class); conf.setMapOutputValueClass(IntWritable.class);
// set up two map jobs, so we can test merge phase in Reduce also // set up two map jobs, so we can test merge phase in Reduce also
conf.setNumMapTasks(2); conf.setNumMapTasks(2);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setOutputFormat(SequenceFileOutputFormat.class);
if (!fs.mkdirs(testdir)) { if (!fs.mkdirs(testdir)) {

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRConfig;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -77,6 +78,8 @@ public void testCustomFile() throws Exception {
conf.setMapperClass(TestMap.class); conf.setMapperClass(TestMap.class);
conf.setReducerClass(TestReduce.class); conf.setReducerClass(TestReduce.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
FileInputFormat.setInputPaths(conf, inDir); FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir); FileOutputFormat.setOutputPath(conf, outDir);

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.serializer.JavaSerializationComparator; import org.apache.hadoop.io.serializer.JavaSerializationComparator;
import org.apache.hadoop.mapreduce.MRConfig;
public class TestJavaSerialization extends TestCase { public class TestJavaSerialization extends TestCase {
@ -109,6 +110,8 @@ public void testMapReduceJob() throws Exception {
conf.setMapperClass(WordCountMapper.class); conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class); conf.setReducerClass(SumReducer.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
FileInputFormat.setInputPaths(conf, INPUT_DIR); FileInputFormat.setInputPaths(conf, INPUT_DIR);
FileOutputFormat.setOutputPath(conf, OUTPUT_DIR); FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
@ -155,6 +158,8 @@ public void testWriteToSequencefile() throws Exception {
conf.setMapperClass(WordCountMapper.class); conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class); conf.setReducerClass(SumReducer.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
FileInputFormat.setInputPaths(conf, INPUT_DIR); FileInputFormat.setInputPaths(conf, INPUT_DIR);
FileOutputFormat.setOutputPath(conf, OUTPUT_DIR); FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.*; import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.mapreduce.MRConfig;
import junit.framework.TestCase; import junit.framework.TestCase;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
@ -90,6 +91,8 @@ public void configure() throws Exception {
conf.setOutputKeyClass(Text.class); conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class); conf.setOutputValueClass(Text.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setOutputFormat(SequenceFileOutputFormat.class);
if (!fs.mkdirs(testdir)) { if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString()); throw new IOException("Mkdirs failed to create " + testdir.toString());

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.Test; import org.junit.Test;
@ -351,6 +352,7 @@ public void testNullKeys() throws Exception {
conf.setInputFormat(SequenceFileInputFormat.class); conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setNumReduceTasks(1); conf.setNumReduceTasks(1);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
JobClient.runJob(conf); JobClient.runJob(conf);
@ -382,6 +384,7 @@ private void checkCompression(boolean compressMapOutputs,
conf.setOutputKeyClass(Text.class); conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class); conf.setOutputValueClass(Text.class);
conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
if (includeCombine) { if (includeCombine) {
conf.setCombinerClass(IdentityReducer.class); conf.setCombinerClass(IdentityReducer.class);
} }
@ -445,6 +448,7 @@ public void launch() throws Exception {
} else { } else {
conf = new JobConf(getConf()); conf = new JobConf(getConf());
} }
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.setJarByClass(TestMapRed.class); conf.setJarByClass(TestMapRed.class);
int countsToGo = counts; int countsToGo = counts;
int dist[] = new int[range]; int dist[] = new int[range];
@ -737,6 +741,7 @@ public void runJob(int items) {
conf.setOutputKeyClass(Text.class); conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class); conf.setOutputValueClass(Text.class);
conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
if (!fs.mkdirs(testdir)) { if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString()); throw new IOException("Mkdirs failed to create " + testdir.toString());
} }

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.MRConfig;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;

View File

@ -43,7 +43,7 @@ public void testClusterWithLocalClientProvider() throws Exception {
} }
try { try {
conf.set(MRConfig.FRAMEWORK_NAME, "local"); conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0"); conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
new Cluster(conf); new Cluster(conf);

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
public class TestMapCollection { public class TestMapCollection {
@ -311,6 +312,7 @@ private static void runTest(String name, int keylen, int vallen,
private static void runTest(String name, Job job) throws Exception { private static void runTest(String name, Job job) throws Exception {
job.setNumReduceTasks(1); job.setNumReduceTasks(1);
job.getConfiguration().set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
job.getConfiguration().setInt(MRJobConfig.IO_SORT_FACTOR, 1000); job.getConfiguration().setInt(MRJobConfig.IO_SORT_FACTOR, 1000);
job.getConfiguration().set("fs.default.name", "file:///"); job.getConfiguration().set("fs.default.name", "file:///");
job.getConfiguration().setInt("test.mapcollection.num.maps", 1); job.getConfiguration().setInt("test.mapcollection.num.maps", 1);