svn merge -c 1407505 FIXES: MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit (Mark Fuhs via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1407506 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c6dd3c0263
commit
82ccfb726a
|
@ -494,6 +494,9 @@ Release 0.23.5 - UNRELEASED
|
||||||
MAPREDUCE-4772. Fetch failures can take way too long for a map to be
|
MAPREDUCE-4772. Fetch failures can take way too long for a map to be
|
||||||
restarted (bobby)
|
restarted (bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit
|
||||||
|
(Mark Fuhs via bobby)
|
||||||
|
|
||||||
Release 0.23.4 - UNRELEASED
|
Release 0.23.4 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -107,25 +107,14 @@ public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
|
||||||
numLines++;
|
numLines++;
|
||||||
length += num;
|
length += num;
|
||||||
if (numLines == numLinesPerSplit) {
|
if (numLines == numLinesPerSplit) {
|
||||||
// NLineInputFormat uses LineRecordReader, which always reads
|
splits.add(createFileSplit(fileName, begin, length));
|
||||||
// (and consumes) at least one character out of its upper split
|
|
||||||
// boundary. So to make sure that each mapper gets N lines, we
|
|
||||||
// move back the upper split limits of each split
|
|
||||||
// by one character here.
|
|
||||||
if (begin == 0) {
|
|
||||||
splits.add(new FileSplit(fileName, begin, length - 1,
|
|
||||||
new String[] {}));
|
|
||||||
} else {
|
|
||||||
splits.add(new FileSplit(fileName, begin - 1, length,
|
|
||||||
new String[] {}));
|
|
||||||
}
|
|
||||||
begin += length;
|
begin += length;
|
||||||
length = 0;
|
length = 0;
|
||||||
numLines = 0;
|
numLines = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (numLines != 0) {
|
if (numLines != 0) {
|
||||||
splits.add(new FileSplit(fileName, begin, length, new String[]{}));
|
splits.add(createFileSplit(fileName, begin, length));
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (lr != null) {
|
if (lr != null) {
|
||||||
|
@ -135,6 +124,23 @@ public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
|
||||||
return splits;
|
return splits;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NLineInputFormat uses LineRecordReader, which always reads
|
||||||
|
* (and consumes) at least one character out of its upper split
|
||||||
|
* boundary. So to make sure that each mapper gets N lines, we
|
||||||
|
* move back the upper split limits of each split
|
||||||
|
* by one character here.
|
||||||
|
* @param fileName Path of file
|
||||||
|
* @param begin the position of the first byte in the file to process
|
||||||
|
* @param length number of bytes in InputSplit
|
||||||
|
* @return FileSplit
|
||||||
|
*/
|
||||||
|
protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
|
||||||
|
return (begin == 0)
|
||||||
|
? new FileSplit(fileName, begin, length - 1, new String[] {})
|
||||||
|
: new FileSplit(fileName, begin - 1, length, new String[] {});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the number of lines per split
|
* Set the number of lines per split
|
||||||
* @param job the job to modify
|
* @param job the job to modify
|
||||||
|
|
|
@ -50,37 +50,40 @@ public class TestNLineInputFormat extends TestCase {
|
||||||
Job job = Job.getInstance(conf);
|
Job job = Job.getInstance(conf);
|
||||||
Path file = new Path(workDir, "test.txt");
|
Path file = new Path(workDir, "test.txt");
|
||||||
|
|
||||||
int seed = new Random().nextInt();
|
|
||||||
Random random = new Random(seed);
|
|
||||||
|
|
||||||
localFs.delete(workDir, true);
|
localFs.delete(workDir, true);
|
||||||
FileInputFormat.setInputPaths(job, workDir);
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
int numLinesPerMap = 5;
|
int numLinesPerMap = 5;
|
||||||
NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
|
NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
|
||||||
// for a variety of lengths
|
|
||||||
for (int length = 0; length < MAX_LENGTH;
|
for (int length = 0; length < MAX_LENGTH;
|
||||||
length += random.nextInt(MAX_LENGTH / 10) + 1) {
|
length += 1) {
|
||||||
|
|
||||||
// create a file with length entries
|
// create a file with length entries
|
||||||
Writer writer = new OutputStreamWriter(localFs.create(file));
|
Writer writer = new OutputStreamWriter(localFs.create(file));
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < length; i++) {
|
for (int i = 0; i < length; i++) {
|
||||||
writer.write(Integer.toString(i));
|
writer.write(Integer.toString(i)+" some more text");
|
||||||
writer.write("\n");
|
writer.write("\n");
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
checkFormat(job, numLinesPerMap);
|
int lastN = 0;
|
||||||
|
if (length != 0) {
|
||||||
|
lastN = length % 5;
|
||||||
|
if (lastN == 0) {
|
||||||
|
lastN = 5;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkFormat(job, numLinesPerMap, lastN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void checkFormat(Job job, int expectedN)
|
void checkFormat(Job job, int expectedN, int lastN)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
NLineInputFormat format = new NLineInputFormat();
|
NLineInputFormat format = new NLineInputFormat();
|
||||||
List<InputSplit> splits = format.getSplits(job);
|
List<InputSplit> splits = format.getSplits(job);
|
||||||
// check all splits except last one
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (int i = 0; i < splits.size() -1; i++) {
|
for (int i = 0; i < splits.size(); i++) {
|
||||||
assertEquals("There are no split locations", 0,
|
assertEquals("There are no split locations", 0,
|
||||||
splits.get(i).getLocations().length);
|
splits.get(i).getLocations().length);
|
||||||
TaskAttemptContext context = MapReduceTestUtil.
|
TaskAttemptContext context = MapReduceTestUtil.
|
||||||
|
@ -104,10 +107,15 @@ public class TestNLineInputFormat extends TestCase {
|
||||||
} finally {
|
} finally {
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
assertEquals("number of lines in split is " + expectedN ,
|
if ( i == splits.size() - 1) {
|
||||||
|
assertEquals("number of lines in split(" + i + ") is wrong" ,
|
||||||
|
lastN, count);
|
||||||
|
} else {
|
||||||
|
assertEquals("number of lines in split(" + i + ") is wrong" ,
|
||||||
expectedN, count);
|
expectedN, count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
new TestNLineInputFormat().testFormat();
|
new TestNLineInputFormat().testFormat();
|
||||||
|
|
Loading…
Reference in New Issue