From 907e861241e0ca041fd88c48c69288e549c1bf4a Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Mon, 19 Dec 2011 23:08:19 +0000 Subject: [PATCH] Merge -c 1220996 from trunk to branch-0.23 to fix MAPREDUCE-3563. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1220997 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 6 +- .../apache/hadoop/mapred/LocalJobRunner.java | 40 ++++- .../mapred/TestLocalModeWithNewApis.java | 157 ++++++++++++++++++ .../lib/output/FileOutputCommitter.java | 2 +- 4 files changed, 201 insertions(+), 4 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalModeWithNewApis.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0a70e6bac9e..4974abad684 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -282,7 +282,11 @@ Release 0.23.1 - Unreleased before the job started, so that it works properly with oozie throughout the job execution. (Robert Joseph Evans via vinodkv) - MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url without a port. (atm via harsh) + MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url + without a port. (atm via harsh) + + MAPREDUCE-3563. Fixed LocalJobRunner to work correctly with new mapreduce + apis. (acmurthy) Release 0.23.0 - 2011-11-01 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index c8b59ebdac3..7fe5b99aeb7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -43,6 +43,7 @@ import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.QueueInfo; import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.TaskTrackerInfo; @@ -52,11 +53,13 @@ import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIden import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.ReflectionUtils; /** Implements MapReduce locally, in-process, for debugging. */ @InterfaceAudience.Private @@ -304,12 +307,45 @@ public class LocalJobRunner implements ClientProtocol { return executor; } + private org.apache.hadoop.mapreduce.OutputCommitter + createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception { + org.apache.hadoop.mapreduce.OutputCommitter committer = null; + + LOG.info("OutputCommitter set in config " + + conf.get("mapred.output.committer.class")); + + if (newApiCommitter) { + org.apache.hadoop.mapreduce.TaskID taskId = + new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0); + org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID = + new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0); + org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = + new TaskAttemptContextImpl(conf, taskAttemptID); + OutputFormat outputFormat = + ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf); + committer = outputFormat.getOutputCommitter(taskContext); + } else { + committer = ReflectionUtils.newInstance(conf.getClass( + "mapred.output.committer.class", FileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class), conf); + } + LOG.info("OutputCommitter is " + committer.getClass().getName()); + return committer; + } + @Override public void run() { JobID jobId = profile.getJobID(); JobContext jContext = new JobContextImpl(job, jobId); - OutputCommitter outputCommitter = job.getOutputCommitter(); - + + org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null; + try { + outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf); + } catch (Exception e) { + LOG.info("Failed to createOutputCommitter", e); + return; + } + try { TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalModeWithNewApis.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalModeWithNewApis.java new file mode 100644 index 00000000000..e9e04709e32 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalModeWithNewApis.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import static org.junit.Assert.*; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; +import java.util.StringTokenizer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestLocalModeWithNewApis { + + public static final Log LOG = + LogFactory.getLog(TestLocalModeWithNewApis.class); + + Configuration conf; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testNewApis() throws Exception { + Random r = new Random(System.currentTimeMillis()); + Path tmpBaseDir = new Path("/tmp/wc-" + r.nextInt()); + final Path inDir = new Path(tmpBaseDir, "input"); + final Path outDir = new Path(tmpBaseDir, "output"); + String input = "The quick brown fox\nhas many silly\nred fox sox\n"; + FileSystem inFs = inDir.getFileSystem(conf); + FileSystem outFs = outDir.getFileSystem(conf); + outFs.delete(outDir, true); + if (!inFs.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + { + DataOutputStream file = inFs.create(new Path(inDir, "part-0")); + file.writeBytes(input); + file.close(); + } + + Job job = Job.getInstance(conf, "word count"); + job.setJarByClass(TestLocalModeWithNewApis.class); + job.setMapperClass(TokenizerMapper.class); + job.setCombinerClass(IntSumReducer.class); + job.setReducerClass(IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, inDir); + FileOutputFormat.setOutputPath(job, outDir); + assertEquals(job.waitForCompletion(true), true); + + String output = readOutput(outDir, conf); + assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" + + "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", output); + + outFs.delete(tmpBaseDir, true); + } + + static String readOutput(Path outDir, Configuration conf) + throws IOException { + FileSystem fs = outDir.getFileSystem(conf); + StringBuffer result = new StringBuffer(); + + Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir, + new Utils.OutputFileUtils.OutputFilesFilter())); + for (Path outputFile : fileList) { + LOG.info("Path" + ": "+ outputFile); + BufferedReader file = + new BufferedReader(new InputStreamReader(fs.open(outputFile))); + String line = file.readLine(); + while (line != null) { + result.append(line); + result.append("\n"); + line = file.readLine(); + } + file.close(); + } + return result.toString(); + } + + public static class TokenizerMapper + extends Mapper{ + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context + ) throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } + + + public static class IntSumReducer + extends Reducer { + private IntWritable result = new IntWritable(); + + public void reduce(Text key, Iterable values, + Context context + ) throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 497ca317fd3..ccd32e0c1bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -233,7 +233,7 @@ public class FileOutputCommitter extends OutputCommitter { " directory of task: " + attemptId + " - " + workPath); } LOG.info("Saved output of task '" + attemptId + "' to " + - outputPath); + jobOutputPath); } } }