From 96d4812433d6399bb405b6c14dab556fff211f4d Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Thu, 29 Mar 2012 07:47:55 +0000 Subject: [PATCH] Merge -c 1306736 from trunk to branch-2 to fix MAPREDUCE-3377. Added a unit test to ensure OutputCommitter.checkOutputSpecs is called prior to copying job.xml. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1306737 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/TestMROutputFormat.java | 206 ++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMROutputFormat.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 210f21cced9..af7365fc990 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -86,6 +86,9 @@ Release 2.0.0 - UNRELEASED MAPREDUCE-4066. Use default value when fetching MR_AM_STAGING_DIR (xieguiming via harsh) + MAPREDUCE-3377. Added a unit test to ensure OutputCommitter.checkOutputSpecs + is called prior to copying job.xml. (Jane Chen via acmurthy) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMROutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMROutputFormat.java new file mode 100644 index 00000000000..6a693e9fa91 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMROutputFormat.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class TestConfInCheckSpec { + + @Test + public void testJobSubmission() throws Exception { + JobConf conf = new JobConf(); + Job job = new Job(conf); + job.setInputFormatClass(TestInputFormat.class); + job.setMapperClass(TestMapper.class); + job.setOutputFormatClass(TestOutputFormat.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(IntWritable.class); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + } + + public static class TestMapper + extends Mapper { + public void map(IntWritable key, IntWritable value, Context context) + throws IOException, InterruptedException { + context.write(key, value); + } + } +} + +class TestInputFormat extends InputFormat { + + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException, + InterruptedException { + return new RecordReader() { + + private boolean done = false; + + @Override + public void close() throws IOException { + } + + @Override + public IntWritable getCurrentKey() throws IOException, + InterruptedException { + return new IntWritable(0); + } + + @Override + public IntWritable getCurrentValue() throws IOException, + InterruptedException { + return new IntWritable(0); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return done ? 0 : 1; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (!done) { + done = true; + return true; + } + return false; + } + }; + } + + @Override + public List getSplits(JobContext context) throws IOException, + InterruptedException { + List list = new ArrayList(); + list.add(new TestInputSplit()); + return list; + } +} + +class TestInputSplit extends InputSplit implements Writable { + + @Override + public long getLength() throws IOException, InterruptedException { + return 1; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + String[] hosts = {"localhost"}; + return hosts; + } + + @Override + public void readFields(DataInput in) throws IOException { + } + + @Override + public void write(DataOutput out) throws IOException { + } +} + +class TestOutputFormat extends OutputFormat +implements Configurable { + + public static final String TEST_CONFIG_NAME = "mapred.test.jobsubmission"; + private Configuration conf; + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, + InterruptedException { + conf.setBoolean(TEST_CONFIG_NAME, true); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new OutputCommitter() { + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException { + return false; + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + } + }; + } + + @Override + public RecordWriter getRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + assertTrue(context.getConfiguration().getBoolean(TEST_CONFIG_NAME, false)); + return new RecordWriter() { + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + } + + @Override + public void write(IntWritable key, IntWritable value) throws IOException, + InterruptedException { + } + }; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } +} \ No newline at end of file