diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index fe8ee947b09..02099b7ce7a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -162,6 +162,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5746. Job diagnostics can implicate wrong task for a failed job. (Jason Lowe via kasha) + MAPREDUCE-5670. CombineFileRecordReader should report progress when moving + to the next file (Chen He via jlowe) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java index 1abaef260c5..f54f1760d7d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java @@ -140,6 +140,8 @@ public class CombineFileRecordReader implements RecordReader { return false; } + reporter.progress(); + // get a record reader for the idx-th chunk try { curReader = rrConstructor.newInstance(new Object [] diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java index fb86cbafc12..767f79a1c02 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java @@ -54,7 +54,7 @@ public class CombineFileRecordReader extends RecordReader { protected int idx; protected long progress; protected RecordReader curReader; - + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (CombineFileSplit)split; @@ -144,6 +144,8 @@ public class CombineFileRecordReader extends RecordReader { return false; } + context.progress(); + // get a record reader for the idx-th chunk try { Configuration conf = context.getConfiguration(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/lib/TestCombineFileRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/lib/TestCombineFileRecordReader.java new file mode 100644 index 00000000000..296aa232389 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/lib/TestCombineFileRecordReader.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.lib; + +import java.io.File; +import java.io.FileWriter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.fs.FileUtil; + +import org.junit.Test; +import org.mockito.Mockito; +import org.junit.Assert; + +import java.io.IOException; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestCombineFileRecordReader { + + private static Path outDir = new Path(System.getProperty("test.build.data", + "/tmp"), TestCombineFileRecordReader.class.getName()); + + private static class TextRecordReaderWrapper + extends org.apache.hadoop.mapred.lib.CombineFileRecordReaderWrapper { + // this constructor signature is required by CombineFileRecordReader + public TextRecordReaderWrapper(CombineFileSplit split, Configuration conf, + Reporter reporter, Integer idx) throws IOException { + super(new TextInputFormat(), split, conf, reporter, idx); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testInitNextRecordReader() throws IOException{ + JobConf conf = new JobConf(); + Path[] paths = new Path[3]; + long[] fileLength = new long[3]; + File[] files = new File[3]; + LongWritable key = new LongWritable(1); + Text value = new Text(); + try { + for(int i=0;i<3;i++){ + fileLength[i] = i; + File dir = new File(outDir.toString()); + dir.mkdir(); + files[i] = new File(dir,"testfile"+i); + FileWriter fileWriter = new FileWriter(files[i]); + fileWriter.close(); + paths[i] = new Path(outDir+"/testfile"+i); + } + CombineFileSplit combineFileSplit = new CombineFileSplit(conf, paths, fileLength); + Reporter reporter = Mockito.mock(Reporter.class); + CombineFileRecordReader cfrr = new CombineFileRecordReader(conf, combineFileSplit, + reporter, TextRecordReaderWrapper.class); + verify(reporter).progress(); + Assert.assertFalse(cfrr.next(key,value)); + verify(reporter, times(3)).progress(); + } finally { + FileUtil.fullyDelete(new File(outDir.toString())); + } + + } +} + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java new file mode 100644 index 00000000000..052195efcdc --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.File; + +import junit.framework.Assert; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapred.Task.TaskReporter; + +import org.mockito.Mockito; + +import org.junit.Test; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestCombineFileRecordReader { + + private static Path outDir = new Path(System.getProperty("test.build.data", + "/tmp"), TestCombineFileRecordReader.class.getName()); + private static class TextRecordReaderWrapper + extends CombineFileRecordReaderWrapper { + // this constructor signature is required by CombineFileRecordReader + public TextRecordReaderWrapper(org.apache.hadoop.mapreduce.lib.input.CombineFileSplit split, + TaskAttemptContext context, Integer idx) + throws IOException, InterruptedException { + super(new TextInputFormat(), split, context, idx); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testProgressIsReportedIfInputASeriesOfEmptyFiles() throws IOException, InterruptedException { + JobConf conf = new JobConf(); + Path[] paths = new Path[3]; + File[] files = new File[3]; + long[] fileLength = new long[3]; + + try { + for(int i=0;i<3;i++){ + File dir = new File(outDir.toString()); + dir.mkdir(); + files[i] = new File(dir,"testfile"+i); + FileWriter fileWriter = new FileWriter(files[i]); + fileWriter.flush(); + fileWriter.close(); + fileLength[i] = i; + paths[i] = new Path(outDir+"/testfile"+i); + } + + CombineFileSplit combineFileSplit = new CombineFileSplit(paths, fileLength); + TaskAttemptID taskAttemptID = Mockito.mock(TaskAttemptID.class); + TaskReporter reporter = Mockito.mock(TaskReporter.class); + TaskAttemptContextImpl taskAttemptContext = + new TaskAttemptContextImpl(conf, taskAttemptID,reporter); + + CombineFileRecordReader cfrr = new CombineFileRecordReader(combineFileSplit, + taskAttemptContext, TextRecordReaderWrapper.class); + + cfrr.initialize(combineFileSplit,taskAttemptContext); + + verify(reporter).progress(); + Assert.assertFalse(cfrr.nextKeyValue()); + verify(reporter, times(3)).progress(); + } finally { + FileUtil.fullyDelete(new File(outDir.toString())); + } + } +} +