MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that cleanup is now called even if there is an error. The old mapred API already ensures that Mapper.close and Reducer.close are invoked during error handling. Note that it is an incompatible change, however end-users can override Mapper.run and Reducer.run to get the old (inconsistent) behaviour. Contributed by Arun C. Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1471556 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-04-24 17:38:43 +00:00
parent 0777474cc2
commit 40e78c2ca2
6 changed files with 458 additions and 37 deletions

View File

@ -339,6 +339,14 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5146. application classloader may be used too early to load
classes. (Sangjin Lee via tomwhite)
MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent
with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that
cleanup is now called even if there is an error. The old mapred API
already ensures that Mapper.close and Reducer.close are invoked during
error handling. Note that it is an incompatible change, however end-users
can override Mapper.run and Reducer.run to get the old (inconsistent)
behaviour. (acmurthy)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -434,10 +434,15 @@ public class MapTask extends Task {
}
statusUpdate(umbilical);
collector.flush();
} finally {
//close
in.close(); // close input
in.close();
in = null;
collector.close();
collector = null;
} finally {
closeQuietly(in);
closeQuietly(collector);
}
}
@ -753,13 +758,20 @@ public class MapTask extends Task {
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
output.close(mapperContext);
try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
class DirectMapOutputCollector<K, V>
@ -1949,4 +1961,55 @@ public class MapTask extends Task {
}
}
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void closeQuietly(RecordReader<INKEY, INVALUE> c) {
if (c != null) {
try {
c.close();
} catch (IOException ie) {
// Ignore
LOG.info("Ignoring exception during close for " + c, ie);
}
}
}
private <OUTKEY, OUTVALUE>
void closeQuietly(MapOutputCollector<OUTKEY, OUTVALUE> c) {
if (c != null) {
try {
c.close();
} catch (Exception ie) {
// Ignore
LOG.info("Ignoring exception during close for " + c, ie);
}
}
}
private <INKEY, INVALUE, OUTKEY, OUTVALUE>
void closeQuietly(
org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> c) {
if (c != null) {
try {
c.close();
} catch (Exception ie) {
// Ignore
LOG.info("Ignoring exception during close for " + c, ie);
}
}
}
private <INKEY, INVALUE, OUTKEY, OUTVALUE>
void closeQuietly(
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> c,
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext) {
if (c != null) {
try {
c.close(mapperContext);
} catch (Exception ie) {
// Ignore
LOG.info("Ignoring exception during close for " + c, ie);
}
}
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
@ -428,14 +429,15 @@ public class ReduceTask extends Task {
// make output collector
String finalName = getOutputName(getPartition());
final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
this, job, reporter, finalName);
final RecordWriter<OUTKEY, OUTVALUE> finalOut = out;
OutputCollector<OUTKEY,OUTVALUE> collector =
new OutputCollector<OUTKEY,OUTVALUE>() {
public void collect(OUTKEY key, OUTVALUE value)
throws IOException {
out.write(key, value);
finalOut.write(key, value);
// indicate that progress update needs to be sent
reporter.progress();
}
@ -466,20 +468,14 @@ public class ReduceTask extends Task {
values.informReduceProgress();
}
//Clean up: repeated in catch block below
reducer.close();
out.close(reporter);
//End of clean up.
} catch (IOException ioe) {
try {
reducer.close();
} catch (IOException ignored) {}
try {
out.close(reporter);
} catch (IOException ignored) {}
reducer = null;
throw ioe;
out.close(reporter);
out = null;
} finally {
IOUtils.cleanup(LOG, reducer);
closeQuietly(out, reporter);
}
}
@ -645,7 +641,21 @@ public class ReduceTask extends Task {
committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
trackedRW.close(reducerContext);
try {
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
private <OUTKEY, OUTVALUE>
void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) {
if (c != null) {
try {
c.close(r);
} catch (Exception e) {
LOG.info("Exception in closing " + c, e);
}
}
}
}

View File

@ -140,9 +140,12 @@ public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
cleanup(context);
}
}

View File

@ -166,14 +166,17 @@ public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
cleanup(context);
}
}

View File

@ -0,0 +1,334 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.junit.Assert;
import org.junit.Test;
public class TestMapperReducerCleanup {
static boolean mapCleanup = false;
static boolean reduceCleanup = false;
static boolean recordReaderCleanup = false;
static boolean recordWriterCleanup = false;
static void reset() {
mapCleanup = false;
reduceCleanup = false;
recordReaderCleanup = false;
recordWriterCleanup = false;
}
private static class FailingMapper
extends Mapper<LongWritable, Text, LongWritable, Text> {
/** Map method with different behavior based on the thread id */
public void map(LongWritable key, Text val, Context c)
throws IOException, InterruptedException {
throw new IOException("TestMapperReducerCleanup");
}
protected void cleanup(Context context)
throws IOException, InterruptedException {
mapCleanup = true;
super.cleanup(context);
}
}
private static class TrackingTokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
mapCleanup = true;
super.cleanup(context);
}
}
private static class FailingReducer
extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
public void reduce(LongWritable key, Iterable<Text> vals, Context context)
throws IOException, InterruptedException {
throw new IOException("TestMapperReducerCleanup");
}
protected void cleanup(Context context)
throws IOException, InterruptedException {
reduceCleanup = true;
super.cleanup(context);
}
}
@SuppressWarnings("rawtypes")
private static class TrackingIntSumReducer extends IntSumReducer {
@SuppressWarnings("unchecked")
protected void cleanup(Context context)
throws IOException, InterruptedException {
reduceCleanup = true;
super.cleanup(context);
}
}
public static class TrackingTextInputFormat extends TextInputFormat {
public static class TrackingRecordReader extends LineRecordReader {
@Override
public synchronized void close() throws IOException {
recordReaderCleanup = true;
super.close();
}
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new TrackingRecordReader();
}
}
@SuppressWarnings("rawtypes")
public static class TrackingTextOutputFormat extends TextOutputFormat {
public static class TrackingRecordWriter extends LineRecordWriter {
public TrackingRecordWriter(DataOutputStream out) {
super(out);
}
@Override
public synchronized void close(TaskAttemptContext context)
throws IOException {
recordWriterCleanup = true;
super.close(context);
}
}
@Override
public RecordWriter getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
Path file = getDefaultWorkFile(job, "");
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return new TrackingRecordWriter(fileOut);
}
}
/**
* Create a single input file in the input directory.
* @param dirPath the directory in which the file resides
* @param id the file id number
* @param numRecords how many records to write to each file.
*/
private void createInputFile(Path dirPath, int id, int numRecords)
throws IOException {
final String MESSAGE = "This is a line in a file: ";
Path filePath = new Path(dirPath, "" + id);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
OutputStream os = fs.create(filePath);
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
for (int i = 0; i < numRecords; i++) {
w.write(MESSAGE + id + " " + i + "\n");
}
w.close();
}
private final String INPUT_DIR = "input";
private final String OUTPUT_DIR = "output";
private Path getInputPath() {
String dataDir = System.getProperty("test.build.data");
if (null == dataDir) {
return new Path(INPUT_DIR);
} else {
return new Path(new Path(dataDir), INPUT_DIR);
}
}
private Path getOutputPath() {
String dataDir = System.getProperty("test.build.data");
if (null == dataDir) {
return new Path(OUTPUT_DIR);
} else {
return new Path(new Path(dataDir), OUTPUT_DIR);
}
}
private Path createInput() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path inputPath = getInputPath();
// Clear the input directory if it exists, first.
if (fs.exists(inputPath)) {
fs.delete(inputPath, true);
}
// Create an input file
createInputFile(inputPath, 0, 10);
return inputPath;
}
@Test
public void testMapCleanup() throws Exception {
reset();
Job job = Job.getInstance();
Path inputPath = createInput();
Path outputPath = getOutputPath();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
job.setMapperClass(FailingMapper.class);
job.setInputFormatClass(TrackingTextInputFormat.class);
job.setOutputFormatClass(TrackingTextOutputFormat.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
Assert.assertTrue(mapCleanup);
Assert.assertTrue(recordReaderCleanup);
Assert.assertTrue(recordWriterCleanup);
}
@Test
public void testReduceCleanup() throws Exception {
reset();
Job job = Job.getInstance();
Path inputPath = createInput();
Path outputPath = getOutputPath();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
job.setMapperClass(TrackingTokenizerMapper.class);
job.setReducerClass(FailingReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TrackingTextInputFormat.class);
job.setOutputFormatClass(TrackingTextOutputFormat.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
Assert.assertTrue(mapCleanup);
Assert.assertTrue(reduceCleanup);
Assert.assertTrue(recordReaderCleanup);
Assert.assertTrue(recordWriterCleanup);
}
@Test
public void testJobSuccessCleanup() throws Exception {
reset();
Job job = Job.getInstance();
Path inputPath = createInput();
Path outputPath = getOutputPath();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
job.setMapperClass(TrackingTokenizerMapper.class);
job.setReducerClass(TrackingIntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TrackingTextInputFormat.class);
job.setOutputFormatClass(TrackingTextOutputFormat.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
Assert.assertTrue(mapCleanup);
Assert.assertTrue(reduceCleanup);
Assert.assertTrue(recordReaderCleanup);
Assert.assertTrue(recordWriterCleanup);
}
}