From ff8caccc8740e6cd72d81d435542ced338a37002 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 13 May 2016 14:33:48 +0000 Subject: [PATCH] MAPREDUCE-6558. multibyte delimiters with compressed input files generate duplicate records. Contributed by Wilfred Spiegelenburg (cherry picked from commit 9227dfc25f373a99cb66ad7d6bacef8dcf336f77) --- .../lib/input/CompressedSplitLineReader.java | 5 +++ .../hadoop/mapred/TestLineRecordReader.java | 29 ++++++++++++++++++ .../lib/input/TestLineRecordReader.java | 29 ++++++++++++++++++ .../compressedMultibyteDelimiter.txt.bz2 | Bin 0 -> 1096 bytes 4 files changed, 63 insertions(+) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/compressedMultibyteDelimiter.txt.bz2 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java index ef51f5cc678..9d0e949a10b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java @@ -165,4 +165,9 @@ public class CompressedSplitLineReader extends SplitLineReader { public boolean needAdditionalRecordAfterSplit() { return !finished && needAdditionalRecord; } + + @Override + protected void unsetNeedAdditionalRecordAfterSplit() { + needAdditionalRecord = false; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index f50e1efb7ba..844250bf2b2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -654,4 +654,33 @@ public class TestLineRecordReader { assertFalse(reader.next(key, value)); assertEquals(12, reader.getPos()); } + + @Test + public void testBzipWithMultibyteDelimiter() throws IOException { + String testFileName = "compressedMultibyteDelimiter.txt.bz2"; + // firstSplitLength < (headers + blockMarker) will pass always since no + // records will be read (in the test file that is byte 0..9) + // firstSplitlength > (compressed file length - one compressed block + // size + 1) will also always pass since the second split will be empty + // (833 bytes is the last block start in the used data file) + int firstSplitLength = 100; + URL testFileUrl = getClass().getClassLoader().getResource(testFileName); + assertNotNull("Cannot find " + testFileName, testFileUrl); + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + assertTrue("Split size is smaller than header length", + firstSplitLength > 9); + assertTrue("Split size is larger than compressed file size " + + testFilePath, testFileSize > firstSplitLength); + + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + + String delimiter = "\r\r\n"; + conf.set("textinputformat.record.delimiter", delimiter); + testSplitRecordsForFile(conf, firstSplitLength, testFileSize, + testFilePath); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index 6819af7e97b..716c4a6c081 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -617,4 +617,33 @@ public class TestLineRecordReader { // Key should be 12 right after "123456789\r\r\n" assertEquals(12, key.get()); } + + @Test + public void testBzipWithMultibyteDelimiter() throws IOException { + String testFileName = "compressedMultibyteDelimiter.txt.bz2"; + // firstSplitLength < (headers + blockMarker) will pass always since no + // records will be read (in the test file that is byte 0..9) + // firstSplitlength > (compressed file length - one compressed block + // size + 1) will also always pass since the second split will be empty + // (833 bytes is the last block start in the used data file) + int firstSplitLength = 100; + URL testFileUrl = getClass().getClassLoader().getResource(testFileName); + assertNotNull("Cannot find " + testFileName, testFileUrl); + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + assertTrue("Split size is smaller than header length", + firstSplitLength > 9); + assertTrue("Split size is larger than compressed file size " + + testFilePath, testFileSize > firstSplitLength); + + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + + String delimiter = "\r\r\n"; + conf.set("textinputformat.record.delimiter", delimiter); + testSplitRecordsForFile(conf, firstSplitLength, testFileSize, + testFilePath); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/compressedMultibyteDelimiter.txt.bz2 b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/compressedMultibyteDelimiter.txt.bz2 new file mode 100644 index 0000000000000000000000000000000000000000..f8e178f08a83887e7fea00ddee09fe7e25ddf779 GIT binary patch literal 1096 zcmZ>Y$}lu^j8qGbJRNqXj6pdlzJbBWfw}%EyDA5R^71(h4vcq7m?TmXd=1o`h27XB zC4_xACmgxhD8Fh^Rr6P8UlRs~FO6Pa^CmDbWH1zeywEPok;+jN(5}l-9MEpdQP353 zMR6-fu9I{?+ZDx&PO>J7HBO3x!p6oXOEo=bUT0?PcxTfW$iZUw>O<%i7YmCCZ5?gp zsT^qmZ7?Ee)#ebzSx!=C7V>6tWE-`ZE>T=_O5oarw#`aY*TR*Rdxo;T+8+AEYlZow zRs2Rw(tooKg*xu@T49^As<|}AWmR+2+Eq{G6B6_rA4KVhiZ=d;u;X@hb!B(#xL_!M zCLqjn%C(Z)aVHvdrcZysz%zm2{!hn~P7E9`pEC$Bw97Imwz;ufW=b(MNl2X7XmVgC zkFSK_uT{!d>_dPdz^@@PRg3M)L0kU-X%7?Gld@kFA3DjID866FcT3>iLcU)D9~SaU z2|RU@S>ZgDqi7*th~h~_K|!yeDND`(!MPdIuI+_Sid>W$zN~(>dM-!tLf)+$sS39@ zO}q>Wwyhjh9qlp@7BnQ`L3&W&ahJeN4+}xQE4@zASpruSw=Lv_Ft)oaVc)jy)V`G- z{3=V>Z>cg~Tes1JUtufK2@P@Bj_iC?b$?7@$OdktYtl>ZO@I#!ImSNwa z!v-7vB{gmGS+*(v`&~9hssbF9@tjv_zXe2Sqz~IDz zSqVOEYCSwg25K{e6+0I*EY+H~_~NT&mohRg+?thC>BoAk#nee&O8^)oA;7?qw-WdW z3X-cFSwL~Ag}ec6E1X@;@7I36Uz;6uZR^_X33dywShlaIT(Q88`2*i4#S@!ajE!21 zpoGQg(;Ch|4k}@`kavZ%aqbFq8G~h@Kuk*oCQTrb=yOJpUsZ*@pn(6>B<4#?8vbf5 zuxz)Q#Jv5YC*!##4Rh`a(%I%=yXWiDT0Vsb&w!{k$=S75N)-+;>w3~rnUz1GbV zcnk~&zAS;~3;DVP9)fZTD9E5e0}K~XsN~<@zpu}5O}c#Gn?-_`Y%?CXSOm1Cu{cj# z8lDHr3OO!jx6lIx7Dh>m+pZ`c^;-MF87QIwB&H^Wtmd&4YRT(KF5ju)=CV+pOHuMz z%0fw@mNYNH>k^Y%O5JWel+}1Dlkr?ynd6el;$H^+H*QL~P26VGn=En7{^*^Fej+Mj JQ|{&;007O&v(Nwl literal 0 HcmV?d00001