diff --git a/hadoop-client-modules/hadoop-client-minicluster/pom.xml b/hadoop-client-modules/hadoop-client-minicluster/pom.xml index 905d53a38fa..a443648fdc3 100644 --- a/hadoop-client-modules/hadoop-client-minicluster/pom.xml +++ b/hadoop-client-modules/hadoop-client-minicluster/pom.xml @@ -615,6 +615,7 @@ testjar/* testshell/* + testdata/* diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index 3c78cfce55c..99590eda679 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -180,7 +180,8 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec { new DecompressorStream(in, decompressor, conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT)) : - new BZip2CompressionInputStream(in); + new BZip2CompressionInputStream( + in, 0L, Long.MAX_VALUE, READ_MODE.BYBLOCK); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestConcatenatedCompressedInput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestConcatenatedCompressedInput.java index 977d083dff7..af6b9529e02 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestConcatenatedCompressedInput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestConcatenatedCompressedInput.java @@ -18,18 +18,6 @@ package org.apache.hadoop.mapred; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.zip.Inflater; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -42,16 +30,26 @@ import org.apache.hadoop.io.compress.zlib.ZlibFactory; import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.ReflectionUtils; import org.junit.After; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Ignore +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.Inflater; + +import static org.junit.Assert.*; + +/** + * Test class for concatenated {@link CompressionInputStream}. + */ public class TestConcatenatedCompressedInput { private static final Logger LOG = LoggerFactory.getLogger(TestConcatenatedCompressedInput.class); - private static int MAX_LENGTH = 10000; private static JobConf defaultConf = new JobConf(); private static FileSystem localFs = null; @@ -85,13 +83,15 @@ public class TestConcatenatedCompressedInput { public void after() { ZlibFactory.loadNativeZLib(); } + + private static final String DEFAULT_WORK_DIR = "target/test-classes/testdata"; private static Path workDir = localFs.makeQualified(new Path( - System.getProperty("test.build.data", "/tmp"), + System.getProperty("test.build.data", DEFAULT_WORK_DIR), "TestConcatenatedCompressedInput")); private static LineReader makeStream(String str) throws IOException { - return new LineReader(new ByteArrayInputStream(str.getBytes("UTF-8")), - defaultConf); + return new LineReader(new ByteArrayInputStream( + str.getBytes("UTF-8")), defaultConf); } private static void writeFile(FileSystem fs, Path name, @@ -190,7 +190,8 @@ public class TestConcatenatedCompressedInput { // copy prebuilt (correct!) version of concat.gz to HDFS final String fn = "concat" + gzip.getDefaultExtension(); - Path fnLocal = new Path(System.getProperty("test.concat.data", "/tmp"), fn); + Path fnLocal = new Path( + System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn); Path fnHDFS = new Path(workDir, fn); localFs.copyFromLocalFile(fnLocal, fnHDFS); @@ -227,7 +228,7 @@ public class TestConcatenatedCompressedInput { @Test public void testPrototypeInflaterGzip() throws IOException { CompressionCodec gzip = new GzipCodec(); // used only for file extension - localFs.delete(workDir, true); // localFs = FileSystem instance + localFs.delete(workDir, true); // localFs = FileSystem instance System.out.println(COLOR_BR_BLUE + "testPrototypeInflaterGzip() using " + "non-native/Java Inflater and manual gzip header/trailer parsing" + @@ -235,7 +236,8 @@ public class TestConcatenatedCompressedInput { // copy prebuilt (correct!) version of concat.gz to HDFS final String fn = "concat" + gzip.getDefaultExtension(); - Path fnLocal = new Path(System.getProperty("test.concat.data", "/tmp"), fn); + Path fnLocal = new Path( + System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn); Path fnHDFS = new Path(workDir, fn); localFs.copyFromLocalFile(fnLocal, fnHDFS); @@ -326,14 +328,16 @@ public class TestConcatenatedCompressedInput { // copy single-member test file to HDFS String fn1 = "testConcatThenCompress.txt" + gzip.getDefaultExtension(); - Path fnLocal1 = new Path(System.getProperty("test.concat.data","/tmp"),fn1); + Path fnLocal1 = new Path( + System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn1); Path fnHDFS1 = new Path(workDir, fn1); localFs.copyFromLocalFile(fnLocal1, fnHDFS1); // copy multiple-member test file to HDFS // (actually in "seekable gzip" format, a la JIRA PIG-42) String fn2 = "testCompressThenConcat.txt" + gzip.getDefaultExtension(); - Path fnLocal2 = new Path(System.getProperty("test.concat.data","/tmp"),fn2); + Path fnLocal2 = new Path( + System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn2); Path fnHDFS2 = new Path(workDir, fn2); localFs.copyFromLocalFile(fnLocal2, fnHDFS2); @@ -439,7 +443,8 @@ public class TestConcatenatedCompressedInput { InputSplit[] splits = format.getSplits(jConf, 100); assertEquals("compressed splits == 2", 2, splits.length); FileSplit tmp = (FileSplit) splits[0]; - if (tmp.getPath().getName().equals("testCompressThenConcat.txt.gz")) { + if (tmp.getPath() + .getName().equals("testdata/testCompressThenConcat.txt.gz")) { System.out.println(" (swapping)"); splits[0] = splits[1]; splits[1] = tmp; @@ -481,7 +486,8 @@ public class TestConcatenatedCompressedInput { // copy prebuilt (correct!) version of concat.bz2 to HDFS final String fn = "concat" + bzip2.getDefaultExtension(); - Path fnLocal = new Path(System.getProperty("test.concat.data", "/tmp"), fn); + Path fnLocal = new Path( + System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn); Path fnHDFS = new Path(workDir, fn); localFs.copyFromLocalFile(fnLocal, fnHDFS); @@ -531,13 +537,15 @@ public class TestConcatenatedCompressedInput { // copy single-member test file to HDFS String fn1 = "testConcatThenCompress.txt" + bzip2.getDefaultExtension(); - Path fnLocal1 = new Path(System.getProperty("test.concat.data","/tmp"),fn1); + Path fnLocal1 = new Path( + System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn1); Path fnHDFS1 = new Path(workDir, fn1); localFs.copyFromLocalFile(fnLocal1, fnHDFS1); // copy multiple-member test file to HDFS String fn2 = "testCompressThenConcat.txt" + bzip2.getDefaultExtension(); - Path fnLocal2 = new Path(System.getProperty("test.concat.data","/tmp"),fn2); + Path fnLocal2 = new Path( + System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn2); Path fnHDFS2 = new Path(workDir, fn2); localFs.copyFromLocalFile(fnLocal2, fnHDFS2); @@ -549,21 +557,6 @@ public class TestConcatenatedCompressedInput { assertEquals("concat bytes available", 2567, in1.available()); assertEquals("concat bytes available", 3056, in2.available()); -/* - // FIXME - // The while-loop below dies at the beginning of the 2nd concatenated - // member (after 17 lines successfully read) with: - // - // java.io.IOException: bad block header - // at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock( - // CBZip2InputStream.java:527) - // - // It is not critical to concatenated-gzip support, HADOOP-6835, so it's - // simply commented out for now (and HADOOP-6852 filed). If and when the - // latter issue is resolved--perhaps by fixing an error here--this code - // should be reenabled. Note that the doMultipleBzip2BufferSizes() test - // below uses the same testCompressThenConcat.txt.bz2 file but works fine. - CompressionInputStream cin2 = bzip2.createInputStream(in2); LineReader in = new LineReader(cin2); Text out = new Text(); @@ -578,7 +571,6 @@ public class TestConcatenatedCompressedInput { 5346, totalBytes); assertEquals("total uncompressed lines in concatenated test file", 84, lineNum); - */ // test CBZip2InputStream with lots of different input-buffer sizes doMultipleBzip2BufferSizes(jobConf); @@ -645,7 +637,8 @@ public class TestConcatenatedCompressedInput { // this tests both files (testCompressThenConcat, testConcatThenCompress); all // should work with existing Java bzip2 decoder and any future native version - private static void doSingleBzip2BufferSize(JobConf jConf) throws IOException { + private static void doSingleBzip2BufferSize(JobConf jConf) + throws IOException { TextInputFormat format = new TextInputFormat(); format.configure(jConf); format.setMinSplitSize(5500); // work around 256-byte/22-splits issue @@ -654,7 +647,8 @@ public class TestConcatenatedCompressedInput { InputSplit[] splits = format.getSplits(jConf, 100); assertEquals("compressed splits == 2", 2, splits.length); FileSplit tmp = (FileSplit) splits[0]; - if (tmp.getPath().getName().equals("testCompressThenConcat.txt.gz")) { + if (tmp.getPath() + .getName().equals("testdata/testCompressThenConcat.txt.gz")) { System.out.println(" (swapping)"); splits[0] = splits[1]; splits[1] = tmp; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/concat.bz2 b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/concat.bz2 new file mode 100644 index 00000000000..f31fb0c32bb Binary files /dev/null and b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/concat.bz2 differ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/concat.gz b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/concat.gz new file mode 100644 index 00000000000..53d5a07fcae Binary files /dev/null and b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/concat.gz differ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testCompressThenConcat.txt.bz2 b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testCompressThenConcat.txt.bz2 new file mode 100644 index 00000000000..a21c0e2c10b Binary files /dev/null and b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testCompressThenConcat.txt.bz2 differ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testCompressThenConcat.txt.gz b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testCompressThenConcat.txt.gz new file mode 100644 index 00000000000..75e5f8c7f74 Binary files /dev/null and b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testCompressThenConcat.txt.gz differ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testConcatThenCompress.txt.bz2 b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testConcatThenCompress.txt.bz2 new file mode 100644 index 00000000000..5983e52cc03 Binary files /dev/null and b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testConcatThenCompress.txt.bz2 differ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testConcatThenCompress.txt.gz b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testConcatThenCompress.txt.gz new file mode 100644 index 00000000000..6e8eaa56f78 Binary files /dev/null and b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/testdata/testConcatThenCompress.txt.gz differ