MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit (Mark Fuhs via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1407505 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-11-09 15:52:33 +00:00
parent 933a6d2c1e
commit 1f40b8b4e8
3 changed files with 42 additions and 25 deletions

View File

@ -641,6 +641,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4772. Fetch failures can take way too long for a map to be MAPREDUCE-4772. Fetch failures can take way too long for a map to be
restarted (bobby) restarted (bobby)
MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit
(Mark Fuhs via bobby)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -107,25 +107,14 @@ public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
numLines++; numLines++;
length += num; length += num;
if (numLines == numLinesPerSplit) { if (numLines == numLinesPerSplit) {
// NLineInputFormat uses LineRecordReader, which always reads splits.add(createFileSplit(fileName, begin, length));
// (and consumes) at least one character out of its upper split
// boundary. So to make sure that each mapper gets N lines, we
// move back the upper split limits of each split
// by one character here.
if (begin == 0) {
splits.add(new FileSplit(fileName, begin, length - 1,
new String[] {}));
} else {
splits.add(new FileSplit(fileName, begin - 1, length,
new String[] {}));
}
begin += length; begin += length;
length = 0; length = 0;
numLines = 0; numLines = 0;
} }
} }
if (numLines != 0) { if (numLines != 0) {
splits.add(new FileSplit(fileName, begin, length, new String[]{})); splits.add(createFileSplit(fileName, begin, length));
} }
} finally { } finally {
if (lr != null) { if (lr != null) {
@ -135,6 +124,23 @@ public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
return splits; return splits;
} }
/**
* NLineInputFormat uses LineRecordReader, which always reads
* (and consumes) at least one character out of its upper split
* boundary. So to make sure that each mapper gets N lines, we
* move back the upper split limits of each split
* by one character here.
* @param fileName Path of file
* @param begin the position of the first byte in the file to process
* @param length number of bytes in InputSplit
* @return FileSplit
*/
protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
return (begin == 0)
? new FileSplit(fileName, begin, length - 1, new String[] {})
: new FileSplit(fileName, begin - 1, length, new String[] {});
}
/** /**
* Set the number of lines per split * Set the number of lines per split
* @param job the job to modify * @param job the job to modify

View File

@ -50,37 +50,40 @@ public class TestNLineInputFormat extends TestCase {
Job job = Job.getInstance(conf); Job job = Job.getInstance(conf);
Path file = new Path(workDir, "test.txt"); Path file = new Path(workDir, "test.txt");
int seed = new Random().nextInt();
Random random = new Random(seed);
localFs.delete(workDir, true); localFs.delete(workDir, true);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
int numLinesPerMap = 5; int numLinesPerMap = 5;
NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap); NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
// for a variety of lengths
for (int length = 0; length < MAX_LENGTH; for (int length = 0; length < MAX_LENGTH;
length += random.nextInt(MAX_LENGTH / 10) + 1) { length += 1) {
// create a file with length entries // create a file with length entries
Writer writer = new OutputStreamWriter(localFs.create(file)); Writer writer = new OutputStreamWriter(localFs.create(file));
try { try {
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {
writer.write(Integer.toString(i)); writer.write(Integer.toString(i)+" some more text");
writer.write("\n"); writer.write("\n");
} }
} finally { } finally {
writer.close(); writer.close();
} }
checkFormat(job, numLinesPerMap); int lastN = 0;
if (length != 0) {
lastN = length % 5;
if (lastN == 0) {
lastN = 5;
}
}
checkFormat(job, numLinesPerMap, lastN);
} }
} }
void checkFormat(Job job, int expectedN) void checkFormat(Job job, int expectedN, int lastN)
throws IOException, InterruptedException { throws IOException, InterruptedException {
NLineInputFormat format = new NLineInputFormat(); NLineInputFormat format = new NLineInputFormat();
List<InputSplit> splits = format.getSplits(job); List<InputSplit> splits = format.getSplits(job);
// check all splits except last one
int count = 0; int count = 0;
for (int i = 0; i < splits.size() -1; i++) { for (int i = 0; i < splits.size(); i++) {
assertEquals("There are no split locations", 0, assertEquals("There are no split locations", 0,
splits.get(i).getLocations().length); splits.get(i).getLocations().length);
TaskAttemptContext context = MapReduceTestUtil. TaskAttemptContext context = MapReduceTestUtil.
@ -104,10 +107,15 @@ public class TestNLineInputFormat extends TestCase {
} finally { } finally {
reader.close(); reader.close();
} }
assertEquals("number of lines in split is " + expectedN , if ( i == splits.size() - 1) {
assertEquals("number of lines in split(" + i + ") is wrong" ,
lastN, count);
} else {
assertEquals("number of lines in split(" + i + ") is wrong" ,
expectedN, count); expectedN, count);
} }
} }
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
new TestNLineInputFormat().testFormat(); new TestNLineInputFormat().testFormat();