diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b73612d2340..562938f517f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -493,6 +493,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4772. Fetch failures can take way too long for a map to be restarted (bobby) + + MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit + (Mark Fuhs via bobby) Release 0.23.4 - UNRELEASED diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java index 2c7c63dd552..758996165f4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java @@ -107,25 +107,14 @@ public class NLineInputFormat extends FileInputFormat { numLines++; length += num; if (numLines == numLinesPerSplit) { - // NLineInputFormat uses LineRecordReader, which always reads - // (and consumes) at least one character out of its upper split - // boundary. So to make sure that each mapper gets N lines, we - // move back the upper split limits of each split - // by one character here. - if (begin == 0) { - splits.add(new FileSplit(fileName, begin, length - 1, - new String[] {})); - } else { - splits.add(new FileSplit(fileName, begin - 1, length, - new String[] {})); - } + splits.add(createFileSplit(fileName, begin, length)); begin += length; length = 0; numLines = 0; } } if (numLines != 0) { - splits.add(new FileSplit(fileName, begin, length, new String[]{})); + splits.add(createFileSplit(fileName, begin, length)); } } finally { if (lr != null) { @@ -134,6 +123,23 @@ public class NLineInputFormat extends FileInputFormat { } return splits; } + + /** + * NLineInputFormat uses LineRecordReader, which always reads + * (and consumes) at least one character out of its upper split + * boundary. So to make sure that each mapper gets N lines, we + * move back the upper split limits of each split + * by one character here. + * @param fileName Path of file + * @param begin the position of the first byte in the file to process + * @param length number of bytes in InputSplit + * @return FileSplit + */ + protected static FileSplit createFileSplit(Path fileName, long begin, long length) { + return (begin == 0) + ? new FileSplit(fileName, begin, length - 1, new String[] {}) + : new FileSplit(fileName, begin - 1, length, new String[] {}); + } /** * Set the number of lines per split diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java index aca8e53af53..7b3878d9475 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java @@ -50,37 +50,40 @@ public class TestNLineInputFormat extends TestCase { Job job = Job.getInstance(conf); Path file = new Path(workDir, "test.txt"); - int seed = new Random().nextInt(); - Random random = new Random(seed); - localFs.delete(workDir, true); FileInputFormat.setInputPaths(job, workDir); int numLinesPerMap = 5; NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap); - // for a variety of lengths for (int length = 0; length < MAX_LENGTH; - length += random.nextInt(MAX_LENGTH / 10) + 1) { + length += 1) { + // create a file with length entries Writer writer = new OutputStreamWriter(localFs.create(file)); try { for (int i = 0; i < length; i++) { - writer.write(Integer.toString(i)); + writer.write(Integer.toString(i)+" some more text"); writer.write("\n"); } } finally { writer.close(); } - checkFormat(job, numLinesPerMap); + int lastN = 0; + if (length != 0) { + lastN = length % 5; + if (lastN == 0) { + lastN = 5; + } + } + checkFormat(job, numLinesPerMap, lastN); } } - void checkFormat(Job job, int expectedN) + void checkFormat(Job job, int expectedN, int lastN) throws IOException, InterruptedException { NLineInputFormat format = new NLineInputFormat(); List splits = format.getSplits(job); - // check all splits except last one int count = 0; - for (int i = 0; i < splits.size() -1; i++) { + for (int i = 0; i < splits.size(); i++) { assertEquals("There are no split locations", 0, splits.get(i).getLocations().length); TaskAttemptContext context = MapReduceTestUtil. @@ -104,8 +107,13 @@ public class TestNLineInputFormat extends TestCase { } finally { reader.close(); } - assertEquals("number of lines in split is " + expectedN , - expectedN, count); + if ( i == splits.size() - 1) { + assertEquals("number of lines in split(" + i + ") is wrong" , + lastN, count); + } else { + assertEquals("number of lines in split(" + i + ") is wrong" , + expectedN, count); + } } }