MAPREDUCE-5777. Support utf-8 text with Byte Order Marker. (Zhihai Xu via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1600979 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d7f5d6795c
commit
3ed429bb62
|
@ -105,6 +105,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
MAPREDUCE-5895. Close streams properly to avoid leakage in TaskLog.
|
MAPREDUCE-5895. Close streams properly to avoid leakage in TaskLog.
|
||||||
(Kousuke Saruta via devaraj)
|
(Kousuke Saruta via devaraj)
|
||||||
|
|
||||||
|
MAPREDUCE-5777. Support utf-8 text with Byte Order Marker.
|
||||||
|
(Zhihai Xu via kasha)
|
||||||
|
|
||||||
Release 2.4.1 - UNRELEASED
|
Release 2.4.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -91,6 +91,7 @@
|
||||||
<configuration>
|
<configuration>
|
||||||
<excludes>
|
<excludes>
|
||||||
<exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
|
<exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
|
||||||
|
<exclude>src/test/resources/testBOM.txt</exclude>
|
||||||
</excludes>
|
</excludes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
|
@ -197,6 +197,39 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int skipUtfByteOrderMark(Text value) throws IOException {
|
||||||
|
// Strip BOM(Byte Order Mark)
|
||||||
|
// Text only support UTF-8, we only need to check UTF-8 BOM
|
||||||
|
// (0xEF,0xBB,0xBF) at the start of the text stream.
|
||||||
|
int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
|
||||||
|
Integer.MAX_VALUE);
|
||||||
|
int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
|
||||||
|
// Even we read 3 extra bytes for the first line,
|
||||||
|
// we won't alter existing behavior (no backwards incompat issue).
|
||||||
|
// Because the newSize is less than maxLineLength and
|
||||||
|
// the number of bytes copied to Text is always no more than newSize.
|
||||||
|
// If the return size from readLine is not less than maxLineLength,
|
||||||
|
// we will discard the current line and read the next line.
|
||||||
|
pos += newSize;
|
||||||
|
int textLength = value.getLength();
|
||||||
|
byte[] textBytes = value.getBytes();
|
||||||
|
if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
|
||||||
|
(textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
|
||||||
|
// find UTF-8 BOM, strip it.
|
||||||
|
LOG.info("Found UTF-8 BOM and skipped it");
|
||||||
|
textLength -= 3;
|
||||||
|
newSize -= 3;
|
||||||
|
if (textLength > 0) {
|
||||||
|
// It may work to use the same buffer and not do the copyBytes
|
||||||
|
textBytes = value.copyBytes();
|
||||||
|
value.set(textBytes, 3, textLength);
|
||||||
|
} else {
|
||||||
|
value.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newSize;
|
||||||
|
}
|
||||||
|
|
||||||
/** Read a line. */
|
/** Read a line. */
|
||||||
public synchronized boolean next(LongWritable key, Text value)
|
public synchronized boolean next(LongWritable key, Text value)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -206,11 +239,17 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
||||||
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
||||||
key.set(pos);
|
key.set(pos);
|
||||||
|
|
||||||
int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
|
int newSize = 0;
|
||||||
|
if (pos == 0) {
|
||||||
|
newSize = skipUtfByteOrderMark(value);
|
||||||
|
} else {
|
||||||
|
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
|
||||||
|
pos += newSize;
|
||||||
|
}
|
||||||
|
|
||||||
if (newSize == 0) {
|
if (newSize == 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
pos += newSize;
|
|
||||||
if (newSize < maxLineLength) {
|
if (newSize < maxLineLength) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,6 +134,39 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int skipUtfByteOrderMark() throws IOException {
|
||||||
|
// Strip BOM(Byte Order Mark)
|
||||||
|
// Text only support UTF-8, we only need to check UTF-8 BOM
|
||||||
|
// (0xEF,0xBB,0xBF) at the start of the text stream.
|
||||||
|
int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
|
||||||
|
Integer.MAX_VALUE);
|
||||||
|
int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
|
||||||
|
// Even we read 3 extra bytes for the first line,
|
||||||
|
// we won't alter existing behavior (no backwards incompat issue).
|
||||||
|
// Because the newSize is less than maxLineLength and
|
||||||
|
// the number of bytes copied to Text is always no more than newSize.
|
||||||
|
// If the return size from readLine is not less than maxLineLength,
|
||||||
|
// we will discard the current line and read the next line.
|
||||||
|
pos += newSize;
|
||||||
|
int textLength = value.getLength();
|
||||||
|
byte[] textBytes = value.getBytes();
|
||||||
|
if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
|
||||||
|
(textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
|
||||||
|
// find UTF-8 BOM, strip it.
|
||||||
|
LOG.info("Found UTF-8 BOM and skipped it");
|
||||||
|
textLength -= 3;
|
||||||
|
newSize -= 3;
|
||||||
|
if (textLength > 0) {
|
||||||
|
// It may work to use the same buffer and not do the copyBytes
|
||||||
|
textBytes = value.copyBytes();
|
||||||
|
value.set(textBytes, 3, textLength);
|
||||||
|
} else {
|
||||||
|
value.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newSize;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean nextKeyValue() throws IOException {
|
public boolean nextKeyValue() throws IOException {
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
key = new LongWritable();
|
key = new LongWritable();
|
||||||
|
@ -146,9 +179,14 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
||||||
// We always read one extra line, which lies outside the upper
|
// We always read one extra line, which lies outside the upper
|
||||||
// split limit i.e. (end - 1)
|
// split limit i.e. (end - 1)
|
||||||
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
||||||
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
|
if (pos == 0) {
|
||||||
pos += newSize;
|
newSize = skipUtfByteOrderMark();
|
||||||
if (newSize < maxLineLength) {
|
} else {
|
||||||
|
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
|
||||||
|
pos += newSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((newSize == 0) || (newSize < maxLineLength)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -188,4 +188,41 @@ public class TestLineRecordReader {
|
||||||
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
|
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
|
||||||
200 * 1000, true);
|
200 * 1000, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStripBOM() throws IOException {
|
||||||
|
// the test data contains a BOM at the start of the file
|
||||||
|
// confirm the BOM is skipped by LineRecordReader
|
||||||
|
String UTF8_BOM = "\uFEFF";
|
||||||
|
URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt");
|
||||||
|
assertNotNull("Cannot find testBOM.txt", testFileUrl);
|
||||||
|
File testFile = new File(testFileUrl.getFile());
|
||||||
|
Path testFilePath = new Path(testFile.getAbsolutePath());
|
||||||
|
long testFileSize = testFile.length();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
||||||
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
// read the data and check whether BOM is skipped
|
||||||
|
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
|
||||||
|
(String[])null);
|
||||||
|
LineRecordReader reader = new LineRecordReader(conf, split);
|
||||||
|
LongWritable key = new LongWritable();
|
||||||
|
Text value = new Text();
|
||||||
|
int numRecords = 0;
|
||||||
|
boolean firstLine = true;
|
||||||
|
boolean skipBOM = true;
|
||||||
|
while (reader.next(key, value)) {
|
||||||
|
if (firstLine) {
|
||||||
|
firstLine = false;
|
||||||
|
if (value.toString().startsWith(UTF8_BOM)) {
|
||||||
|
skipBOM = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++numRecords;
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
|
||||||
|
assertTrue("BOM is not skipped", skipBOM);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -193,4 +193,42 @@ public class TestLineRecordReader {
|
||||||
200 * 1000,
|
200 * 1000,
|
||||||
true);
|
true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStripBOM() throws IOException {
|
||||||
|
// the test data contains a BOM at the start of the file
|
||||||
|
// confirm the BOM is skipped by LineRecordReader
|
||||||
|
String UTF8_BOM = "\uFEFF";
|
||||||
|
URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt");
|
||||||
|
assertNotNull("Cannot find testBOM.txt", testFileUrl);
|
||||||
|
File testFile = new File(testFileUrl.getFile());
|
||||||
|
Path testFilePath = new Path(testFile.getAbsolutePath());
|
||||||
|
long testFileSize = testFile.length();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
||||||
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
|
||||||
|
|
||||||
|
// read the data and check whether BOM is skipped
|
||||||
|
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
|
||||||
|
(String[])null);
|
||||||
|
LineRecordReader reader = new LineRecordReader();
|
||||||
|
reader.initialize(split, context);
|
||||||
|
int numRecords = 0;
|
||||||
|
boolean firstLine = true;
|
||||||
|
boolean skipBOM = true;
|
||||||
|
while (reader.nextKeyValue()) {
|
||||||
|
if (firstLine) {
|
||||||
|
firstLine = false;
|
||||||
|
if (reader.getCurrentValue().toString().startsWith(UTF8_BOM)) {
|
||||||
|
skipBOM = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++numRecords;
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
|
||||||
|
assertTrue("BOM is not skipped", skipBOM);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
BOM(Byte Order Mark) test file
|
||||||
|
BOM(Byte Order Mark) test file
|
Loading…
Reference in New Issue