From d990c2f413cd976e7dce6fc139d394119a6928c2 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Thu, 11 Apr 2013 21:25:10 +0000 Subject: [PATCH] svn merge -c 1467090 FIXES: HADOOP-9233. Cover package org.apache.hadoop.io.compress.zlib with unit tests. Contributed by Vadim Bondarev git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1467099 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 6 + .../io/compress/CompressDecompressTester.java | 524 ++++++++++++++++++ .../compress/TestCompressorDecompressor.java | 101 ++++ .../zlib/TestZlibCompressorDecompressor.java | 362 ++++++++++++ 4 files changed, 993 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 27a67f6cd90..9560907c614 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -116,6 +116,9 @@ Release 2.0.5-beta - UNRELEASED HADOOP-9222. Cover package with org.apache.hadoop.io.lz4 unit tests (Vadim Bondarev via jlowe) + HADOOP-9233. Cover package org.apache.hadoop.io.compress.zlib with unit + tests (Vadim Bondarev via jlowe) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES @@ -1142,6 +1145,9 @@ Release 0.23.8 - UNRELEASED HADOOP-9222. Cover package with org.apache.hadoop.io.lz4 unit tests (Vadim Bondarev via jlowe) + HADOOP-9233. Cover package org.apache.hadoop.io.compress.zlib with unit + tests (Vadim Bondarev via jlowe) + Release 0.23.7 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java new file mode 100644 index 00000000000..35f84b950e4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.lz4.Lz4Compressor; +import org.apache.hadoop.io.compress.snappy.SnappyCompressor; +import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; +import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.log4j.Logger; +import org.junit.Assert; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import static org.junit.Assert.*; + +public class CompressDecompressTester { + + private static final Logger logger = Logger + .getLogger(CompressDecompressTester.class); + + private final byte[] originalRawData; + + private ImmutableList> pairs = ImmutableList.of(); + private ImmutableList.Builder> builder = ImmutableList.builder(); + + private ImmutableSet stateges = ImmutableSet.of(); + + private PreAssertionTester assertionDelegate; + + public CompressDecompressTester(byte[] originalRawData) { + this.originalRawData = Arrays.copyOf(originalRawData, + originalRawData.length); + this.assertionDelegate = new PreAssertionTester() { + + @Override + public ImmutableList> filterOnAssumeWhat( + ImmutableList> pairs) { + ImmutableList.Builder> builder = ImmutableList + .builder(); + + for (TesterPair pair : pairs) { + if (isAvailable(pair)) + builder.add(pair); + } + return builder.build(); + } + }; + } + + private static boolean isNativeSnappyLoadable() { + boolean snappyAvailable = false; + boolean loaded = false; + try { + System.loadLibrary("snappy"); + logger.warn("Snappy native library is available"); + snappyAvailable = true; + boolean hadoopNativeAvailable = NativeCodeLoader.isNativeCodeLoaded(); + loaded = snappyAvailable && hadoopNativeAvailable; + if (loaded) { + logger.info("Snappy native library loaded"); + } else { + logger.warn("Snappy native library not loaded"); + } + } catch (Throwable t) { + logger.warn("Failed to load snappy: ", t); + return false; + } + return loaded; + } + + public static CompressDecompressTester of( + byte[] rawData) { + return new CompressDecompressTester(rawData); + } + + + public CompressDecompressTester withCompressDecompressPair( + T compressor, E decompressor) { + addPair( + compressor, + decompressor, + Joiner.on("_").join(compressor.getClass().getCanonicalName(), + decompressor.getClass().getCanonicalName())); + return this; + } + + public CompressDecompressTester withTestCases( + ImmutableSet stateges) { + this.stateges = ImmutableSet.copyOf(stateges); + return this; + } + + private void addPair(T compressor, E decompressor, String name) { + builder.add(new TesterPair(name, compressor, decompressor)); + } + + public void test() throws InstantiationException, IllegalAccessException { + pairs = builder.build(); + pairs = assertionDelegate.filterOnAssumeWhat(pairs); + + for (TesterPair pair : pairs) { + for (CompressionTestStrategy strategy : stateges) { + strategy.getTesterStrategy().assertCompression(pair.getName(), + pair.getCompressor(), pair.getDecompressor(), + Arrays.copyOf(originalRawData, originalRawData.length)); + } + } + endAll(pairs); + } + + private void endAll(ImmutableList> pairs) { + for (TesterPair pair : pairs) + pair.end(); + } + + interface PreAssertionTester { + ImmutableList> filterOnAssumeWhat( + ImmutableList> pairs); + } + + public enum CompressionTestStrategy { + + COMPRESS_DECOMPRESS_ERRORS(new TesterCompressionStrategy() { + private final Joiner joiner = Joiner.on("- "); + + @Override + public void assertCompression(String name, Compressor compressor, + Decompressor decompressor, byte[] rawData) { + assertTrue(checkSetInputNullPointerException(compressor)); + assertTrue(checkSetInputNullPointerException(decompressor)); + + assertTrue(checkCompressArrayIndexOutOfBoundsException(compressor, + rawData)); + assertTrue(checkCompressArrayIndexOutOfBoundsException(decompressor, + rawData)); + + assertTrue(checkCompressNullPointerException(compressor, rawData)); + assertTrue(checkCompressNullPointerException(decompressor, rawData)); + + assertTrue(checkSetInputArrayIndexOutOfBoundsException(compressor)); + assertTrue(checkSetInputArrayIndexOutOfBoundsException(decompressor)); + } + + private boolean checkSetInputNullPointerException(Compressor compressor) { + try { + compressor.setInput(null, 0, 1); + } catch (NullPointerException npe) { + return true; + } catch (Exception ex) { + logger.error(joiner.join(compressor.getClass().getCanonicalName(), + "checkSetInputNullPointerException error !!!")); + } + return false; + } + + private boolean checkCompressNullPointerException(Compressor compressor, + byte[] rawData) { + try { + compressor.setInput(rawData, 0, rawData.length); + compressor.compress(null, 0, 1); + } catch (NullPointerException npe) { + return true; + } catch (Exception ex) { + logger.error(joiner.join(compressor.getClass().getCanonicalName(), + "checkCompressNullPointerException error !!!")); + } + return false; + } + + private boolean checkCompressNullPointerException( + Decompressor decompressor, byte[] rawData) { + try { + decompressor.setInput(rawData, 0, rawData.length); + decompressor.decompress(null, 0, 1); + } catch (NullPointerException npe) { + return true; + } catch (Exception ex) { + logger.error(joiner.join(decompressor.getClass().getCanonicalName(), + "checkCompressNullPointerException error !!!")); + } + return false; + } + + private boolean checkSetInputNullPointerException( + Decompressor decompressor) { + try { + decompressor.setInput(null, 0, 1); + } catch (NullPointerException npe) { + return true; + } catch (Exception ex) { + logger.error(joiner.join(decompressor.getClass().getCanonicalName(), + "checkSetInputNullPointerException error !!!")); + } + return false; + } + + private boolean checkSetInputArrayIndexOutOfBoundsException( + Compressor compressor) { + try { + compressor.setInput(new byte[] { (byte) 0 }, 0, -1); + } catch (ArrayIndexOutOfBoundsException e) { + return true; + } catch (Exception e) { + logger.error(joiner.join(compressor.getClass().getCanonicalName(), + "checkSetInputArrayIndexOutOfBoundsException error !!!")); + } + return false; + } + + private boolean checkCompressArrayIndexOutOfBoundsException( + Compressor compressor, byte[] rawData) { + try { + compressor.setInput(rawData, 0, rawData.length); + compressor.compress(new byte[rawData.length], 0, -1); + } catch (ArrayIndexOutOfBoundsException e) { + return true; + } catch (Exception e) { + logger.error(joiner.join(compressor.getClass().getCanonicalName(), + "checkCompressArrayIndexOutOfBoundsException error !!!")); + } + return false; + } + + private boolean checkCompressArrayIndexOutOfBoundsException( + Decompressor decompressor, byte[] rawData) { + try { + decompressor.setInput(rawData, 0, rawData.length); + decompressor.decompress(new byte[rawData.length], 0, -1); + } catch (ArrayIndexOutOfBoundsException e) { + return true; + } catch (Exception e) { + logger.error(joiner.join(decompressor.getClass().getCanonicalName(), + "checkCompressArrayIndexOutOfBoundsException error !!!")); + } + return false; + } + + private boolean checkSetInputArrayIndexOutOfBoundsException( + Decompressor decompressor) { + try { + decompressor.setInput(new byte[] { (byte) 0 }, 0, -1); + } catch (ArrayIndexOutOfBoundsException e) { + return true; + } catch (Exception e) { + logger.error(joiner.join(decompressor.getClass().getCanonicalName(), + "checkNullPointerException error !!!")); + } + return false; + } + + }), + + COMPRESS_DECOMPRESS_SINGLE_BLOCK(new TesterCompressionStrategy() { + final Joiner joiner = Joiner.on("- "); + + @Override + public void assertCompression(String name, Compressor compressor, + Decompressor decompressor, byte[] rawData) { + + int cSize = 0; + int decompressedSize = 0; + byte[] compressedResult = new byte[rawData.length]; + byte[] decompressedBytes = new byte[rawData.length]; + try { + assertTrue( + joiner.join(name, "compressor.needsInput before error !!!"), + compressor.needsInput()); + assertTrue( + joiner.join(name, "compressor.getBytesWritten before error !!!"), + compressor.getBytesWritten() == 0); + compressor.setInput(rawData, 0, rawData.length); + compressor.finish(); + while (!compressor.finished()) { + cSize += compressor.compress(compressedResult, 0, + compressedResult.length); + } + compressor.reset(); + + assertTrue( + joiner.join(name, "decompressor.needsInput() before error !!!"), + decompressor.needsInput()); + decompressor.setInput(compressedResult, 0, cSize); + assertFalse( + joiner.join(name, "decompressor.needsInput() after error !!!"), + decompressor.needsInput()); + while (!decompressor.finished()) { + decompressedSize = decompressor.decompress(decompressedBytes, 0, + decompressedBytes.length); + } + decompressor.reset(); + assertTrue(joiner.join(name, " byte size not equals error !!!"), + decompressedSize == rawData.length); + assertArrayEquals( + joiner.join(name, " byte arrays not equals error !!!"), rawData, + decompressedBytes); + } catch (Exception ex) { + fail(joiner.join(name, ex.getMessage())); + } + } + }), + + COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM(new TesterCompressionStrategy() { + final Joiner joiner = Joiner.on("- "); + final ImmutableMap, Integer> emptySize = ImmutableMap + .of(Lz4Compressor.class, 4, ZlibCompressor.class, 16, + SnappyCompressor.class, 4, BuiltInZlibDeflater.class, 16); + + @Override + void assertCompression(String name, Compressor compressor, + Decompressor decompressor, byte[] originalRawData) { + byte[] buf = null; + ByteArrayInputStream bytesIn = null; + BlockDecompressorStream blockDecompressorStream = null; + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + // close without write + try { + compressor.reset(); + // decompressor.end(); + BlockCompressorStream blockCompressorStream = new BlockCompressorStream( + bytesOut, compressor, 1024, 0); + blockCompressorStream.close(); + // check compressed output + buf = bytesOut.toByteArray(); + int emSize = emptySize.get(compressor.getClass()); + Assert.assertEquals( + joiner.join(name, "empty stream compressed output size != " + + emSize), emSize, buf.length); + // use compressed output as input for decompression + bytesIn = new ByteArrayInputStream(buf); + // create decompression stream + blockDecompressorStream = new BlockDecompressorStream(bytesIn, + decompressor, 1024); + // no byte is available because stream was closed + assertEquals(joiner.join(name, " return value is not -1"), -1, + blockDecompressorStream.read()); + } catch (IOException e) { + fail(joiner.join(name, e.getMessage())); + } finally { + if (blockDecompressorStream != null) + try { + bytesOut.close(); + blockDecompressorStream.close(); + bytesIn.close(); + blockDecompressorStream.close(); + } catch (IOException e) { + } + } + } + + }), + + COMPRESS_DECOMPRESS_BLOCK(new TesterCompressionStrategy() { + private final Joiner joiner = Joiner.on("- "); + private static final int BLOCK_SIZE = 512; + private final byte[] operationBlock = new byte[BLOCK_SIZE]; + // Use default of 512 as bufferSize and compressionOverhead of + // (1% of bufferSize + 12 bytes) = 18 bytes (zlib algorithm). + private static final int overheadSpace = BLOCK_SIZE / 100 + 12; + + @Override + public void assertCompression(String name, Compressor compressor, + Decompressor decompressor, byte[] originalRawData) { + int off = 0; + int len = originalRawData.length; + int maxSize = BLOCK_SIZE - overheadSpace; + int compresSize = 0; + List blockLabels = new ArrayList(); + ByteArrayOutputStream compressedOut = new ByteArrayOutputStream(); + ByteArrayOutputStream decompressOut = new ByteArrayOutputStream(); + try { + if (originalRawData.length > maxSize) { + do { + int bufLen = Math.min(len, maxSize); + compressor.setInput(originalRawData, off, bufLen); + compressor.finish(); + while (!compressor.finished()) { + compresSize = compressor.compress(operationBlock, 0, + operationBlock.length); + compressedOut.write(operationBlock, 0, compresSize); + blockLabels.add(compresSize); + } + compressor.reset(); + off += bufLen; + len -= bufLen; + } while (len > 0); + } + + off = 0; + // compressed bytes + byte[] compressedBytes = compressedOut.toByteArray(); + for (Integer step : blockLabels) { + decompressor.setInput(compressedBytes, off, step); + while (!decompressor.finished()) { + int dSize = decompressor.decompress(operationBlock, 0, + operationBlock.length); + decompressOut.write(operationBlock, 0, dSize); + } + decompressor.reset(); + off = off + step; + } + assertArrayEquals( + joiner.join(name, "byte arrays not equals error !!!"), + originalRawData, decompressOut.toByteArray()); + } catch (Exception ex) { + fail(joiner.join(name, ex.getMessage())); + } finally { + try { + compressedOut.close(); + } catch (IOException e) { + } + try { + decompressOut.close(); + } catch (IOException e) { + } + } + } + }); + + private final TesterCompressionStrategy testerStrategy; + + CompressionTestStrategy(TesterCompressionStrategy testStrategy) { + this.testerStrategy = testStrategy; + } + + public TesterCompressionStrategy getTesterStrategy() { + return testerStrategy; + } + } + + static final class TesterPair { + private final T compressor; + private final E decompressor; + private final String name; + + TesterPair(String name, T compressor, E decompressor) { + this.compressor = compressor; + this.decompressor = decompressor; + this.name = name; + } + + public void end() { + Configuration cfg = new Configuration(); + compressor.reinit(cfg); + compressor.end(); + decompressor.end(); + } + + public T getCompressor() { + return compressor; + } + + public E getDecompressor() { + return decompressor; + } + + public String getName() { + return name; + } + } + + /** + * Method for compressor availability check + */ + private static boolean isAvailable(TesterPair pair) { + Compressor compressor = pair.compressor; + + if (compressor.getClass().isAssignableFrom(Lz4Compressor.class) + && (NativeCodeLoader.isNativeCodeLoaded())) + return true; + + else if (compressor.getClass().isAssignableFrom(BuiltInZlibDeflater.class) + && NativeCodeLoader.isNativeCodeLoaded()) + return true; + + else if (compressor.getClass().isAssignableFrom(ZlibCompressor.class)) { + return ZlibFactory.isNativeZlibLoaded(new Configuration()); + } + else if (compressor.getClass().isAssignableFrom(SnappyCompressor.class) + && isNativeSnappyLoadable()) + return true; + + return false; + } + + abstract static class TesterCompressionStrategy { + + protected final Logger logger = Logger.getLogger(getClass()); + + abstract void assertCompression(String name, Compressor compressor, + Decompressor decompressor, byte[] originalRawData); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java new file mode 100644 index 00000000000..a8ac993c47b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import static org.junit.Assert.fail; +import java.util.Random; +import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy; +import org.apache.hadoop.io.compress.lz4.Lz4Compressor; +import org.apache.hadoop.io.compress.lz4.Lz4Decompressor; +import org.apache.hadoop.io.compress.snappy.SnappyCompressor; +import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; +import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater; +import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater; +import org.junit.Test; +import com.google.common.collect.ImmutableSet; + +/** + * Test for pairs: + *
+ * SnappyCompressor/SnappyDecompressor
+ * Lz4Compressor/Lz4Decompressor
+ * BuiltInZlibDeflater/new BuiltInZlibInflater
+ *
+ *
+ * Note: we can't use ZlibCompressor/ZlibDecompressor here 
+ * because his constructor can throw exception (if native libraries not found)
+ * For ZlibCompressor/ZlibDecompressor pair testing used {@code TestZlibCompressorDecompressor}   
+ *
+ * 
+ * + */ +public class TestCompressorDecompressor { + + private static final Random rnd = new Random(12345L); + + @Test + public void testCompressorDecompressor() { + // no more for this data + int SIZE = 44 * 1024; + + byte[] rawData = generate(SIZE); + try { + CompressDecompressTester.of(rawData) + .withCompressDecompressPair(new SnappyCompressor(), new SnappyDecompressor()) + .withCompressDecompressPair(new Lz4Compressor(), new Lz4Decompressor()) + .withCompressDecompressPair(new BuiltInZlibDeflater(), new BuiltInZlibInflater()) + .withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK, + CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK, + CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS, + CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM)) + .test(); + + } catch (Exception ex) { + fail("testCompressorDecompressor error !!!" + ex); + } + } + + @Test + public void testCompressorDecompressorWithExeedBufferLimit() { + int BYTE_SIZE = 100 * 1024; + byte[] rawData = generate(BYTE_SIZE); + try { + CompressDecompressTester.of(rawData) + .withCompressDecompressPair( + new SnappyCompressor(BYTE_SIZE + BYTE_SIZE / 2), + new SnappyDecompressor(BYTE_SIZE + BYTE_SIZE / 2)) + .withCompressDecompressPair(new Lz4Compressor(BYTE_SIZE), + new Lz4Decompressor(BYTE_SIZE)) + .withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK, + CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK, + CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS, + CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM)) + .test(); + + } catch (Exception ex) { + fail("testCompressorDecompressorWithExeedBufferLimit error !!!" + ex); + } + } + + public static byte[] generate(int size) { + byte[] array = new byte[size]; + for (int i = 0; i < size; i++) + array[i] = (byte) rnd.nextInt(16); + return array; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java new file mode 100644 index 00000000000..6e792d1e4ea --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress.zlib; + +import static org.junit.Assert.*; +import static org.junit.Assume.*; +import java.io.IOException; +import java.io.InputStream; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.compress.CompressDecompressTester; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DecompressorStream; +import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; +import org.junit.Before; +import org.junit.Test; +import com.google.common.collect.ImmutableSet; + +public class TestZlibCompressorDecompressor { + + private static final Random random = new Random(12345L); + + @Before + public void before() { + assumeTrue(ZlibFactory.isNativeZlibLoaded(new Configuration())); + } + + @Test + public void testZlibCompressorDecompressor() { + try { + int SIZE = 44 * 1024; + byte[] rawData = generate(SIZE); + + CompressDecompressTester.of(rawData) + .withCompressDecompressPair(new ZlibCompressor(), new ZlibDecompressor()) + .withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK, + CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK, + CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS, + CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM)) + .test(); + } catch (Exception ex) { + fail("testCompressorDecompressor error !!!" + ex); + } + } + + @Test + public void testCompressorDecompressorWithExeedBufferLimit() { + int BYTE_SIZE = 100 * 1024; + byte[] rawData = generate(BYTE_SIZE); + try { + CompressDecompressTester.of(rawData) + .withCompressDecompressPair( + new ZlibCompressor( + org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel.BEST_COMPRESSION, + CompressionStrategy.DEFAULT_STRATEGY, + org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionHeader.DEFAULT_HEADER, + BYTE_SIZE), + new ZlibDecompressor( + org.apache.hadoop.io.compress.zlib.ZlibDecompressor.CompressionHeader.DEFAULT_HEADER, + BYTE_SIZE)) + .withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK, + CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK, + CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS, + CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM)) + .test(); + } catch (Exception ex) { + fail("testCompressorDecompressorWithExeedBufferLimit error !!!" + ex); + } + } + + + @Test + public void testZlibCompressorDecompressorWithConfiguration() { + Configuration conf = new Configuration(); + conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true); + if (ZlibFactory.isNativeZlibLoaded(conf)) { + byte[] rawData; + int tryNumber = 5; + int BYTE_SIZE = 10 * 1024; + Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); + Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); + rawData = generate(BYTE_SIZE); + try { + for (int i = 0; i < tryNumber; i++) + compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor, + (ZlibDecompressor) zlibDecompressor); + zlibCompressor.reinit(conf); + } catch (Exception ex) { + fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex); + } + } else { + assertTrue("ZlibFactory is using native libs against request", + ZlibFactory.isNativeZlibLoaded(conf)); + } + } + + @Test + public void testZlibCompressDecompress() { + byte[] rawData = null; + int rawDataSize = 0; + rawDataSize = 1024 * 64; + rawData = generate(rawDataSize); + try { + ZlibCompressor compressor = new ZlibCompressor(); + ZlibDecompressor decompressor = new ZlibDecompressor(); + assertFalse("testZlibCompressDecompress finished error", + compressor.finished()); + compressor.setInput(rawData, 0, rawData.length); + assertTrue("testZlibCompressDecompress getBytesRead before error", + compressor.getBytesRead() == 0); + compressor.finish(); + + byte[] compressedResult = new byte[rawDataSize]; + int cSize = compressor.compress(compressedResult, 0, rawDataSize); + assertTrue("testZlibCompressDecompress getBytesRead ather error", + compressor.getBytesRead() == rawDataSize); + assertTrue( + "testZlibCompressDecompress compressed size no less then original size", + cSize < rawDataSize); + decompressor.setInput(compressedResult, 0, cSize); + byte[] decompressedBytes = new byte[rawDataSize]; + decompressor.decompress(decompressedBytes, 0, decompressedBytes.length); + assertArrayEquals("testZlibCompressDecompress arrays not equals ", + rawData, decompressedBytes); + compressor.reset(); + decompressor.reset(); + } catch (IOException ex) { + fail("testZlibCompressDecompress ex !!!" + ex); + } + } + + @Test + public void testZlibCompressorDecompressorSetDictionary() { + Configuration conf = new Configuration(); + conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true); + if (ZlibFactory.isNativeZlibLoaded(conf)) { + Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); + Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); + + checkSetDictionaryNullPointerException(zlibCompressor); + checkSetDictionaryNullPointerException(zlibDecompressor); + + checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor); + checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor); + } else { + assertTrue("ZlibFactory is using native libs against request", + ZlibFactory.isNativeZlibLoaded(conf)); + } + } + + @Test + public void testZlibFactory() { + Configuration cfg = new Configuration(); + + assertTrue("testZlibFactory compression level error !!!", + CompressionLevel.DEFAULT_COMPRESSION == ZlibFactory + .getCompressionLevel(cfg)); + + assertTrue("testZlibFactory compression strategy error !!!", + CompressionStrategy.DEFAULT_STRATEGY == ZlibFactory + .getCompressionStrategy(cfg)); + + ZlibFactory.setCompressionLevel(cfg, CompressionLevel.BEST_COMPRESSION); + assertTrue("testZlibFactory compression strategy error !!!", + CompressionLevel.BEST_COMPRESSION == ZlibFactory + .getCompressionLevel(cfg)); + + ZlibFactory.setCompressionStrategy(cfg, CompressionStrategy.FILTERED); + assertTrue("testZlibFactory compression strategy error !!!", + CompressionStrategy.FILTERED == ZlibFactory.getCompressionStrategy(cfg)); + } + + + private boolean checkSetDictionaryNullPointerException( + Decompressor decompressor) { + try { + decompressor.setDictionary(null, 0, 1); + } catch (NullPointerException ex) { + return true; + } catch (Exception ex) { + } + return false; + } + + private boolean checkSetDictionaryNullPointerException(Compressor compressor) { + try { + compressor.setDictionary(null, 0, 1); + } catch (NullPointerException ex) { + return true; + } catch (Exception ex) { + } + return false; + } + + private boolean checkSetDictionaryArrayIndexOutOfBoundsException( + Compressor compressor) { + try { + compressor.setDictionary(new byte[] { (byte) 0 }, 0, -1); + } catch (ArrayIndexOutOfBoundsException e) { + return true; + } catch (Exception e) { + } + return false; + } + + private boolean checkSetDictionaryArrayIndexOutOfBoundsException( + Decompressor decompressor) { + try { + decompressor.setDictionary(new byte[] { (byte) 0 }, 0, -1); + } catch (ArrayIndexOutOfBoundsException e) { + return true; + } catch (Exception e) { + } + return false; + } + + private byte[] compressDecompressZlib(byte[] rawData, + ZlibCompressor zlibCompressor, ZlibDecompressor zlibDecompressor) + throws IOException { + int cSize = 0; + byte[] compressedByte = new byte[rawData.length]; + byte[] decompressedRawData = new byte[rawData.length]; + zlibCompressor.setInput(rawData, 0, rawData.length); + zlibCompressor.finish(); + while (!zlibCompressor.finished()) { + cSize = zlibCompressor.compress(compressedByte, 0, compressedByte.length); + } + zlibCompressor.reset(); + + assertTrue(zlibDecompressor.getBytesWritten() == 0); + assertTrue(zlibDecompressor.getBytesRead() == 0); + assertTrue(zlibDecompressor.needsInput()); + zlibDecompressor.setInput(compressedByte, 0, cSize); + assertFalse(zlibDecompressor.needsInput()); + while (!zlibDecompressor.finished()) { + zlibDecompressor.decompress(decompressedRawData, 0, + decompressedRawData.length); + } + assertTrue(zlibDecompressor.getBytesWritten() == rawData.length); + assertTrue(zlibDecompressor.getBytesRead() == cSize); + zlibDecompressor.reset(); + assertTrue(zlibDecompressor.getRemaining() == 0); + assertArrayEquals( + "testZlibCompressorDecompressorWithConfiguration array equals error", + rawData, decompressedRawData); + + return decompressedRawData; + } + + @Test + public void testBuiltInGzipDecompressorExceptions() { + BuiltInGzipDecompressor decompresser = new BuiltInGzipDecompressor(); + try { + decompresser.setInput(null, 0, 1); + } catch (NullPointerException ex) { + // expected + } catch (Exception ex) { + fail("testBuiltInGzipDecompressorExceptions npe error " + ex); + } + + try { + decompresser.setInput(new byte[] { 0 }, 0, -1); + } catch (ArrayIndexOutOfBoundsException ex) { + // expected + } catch (Exception ex) { + fail("testBuiltInGzipDecompressorExceptions aioob error" + ex); + } + + assertTrue("decompresser.getBytesRead error", + decompresser.getBytesRead() == 0); + assertTrue("decompresser.getRemaining error", + decompresser.getRemaining() == 0); + decompresser.reset(); + decompresser.end(); + + InputStream decompStream = null; + try { + // invalid 0 and 1 bytes , must be 31, -117 + int buffSize = 1 * 1024; + byte buffer[] = new byte[buffSize]; + Decompressor decompressor = new BuiltInGzipDecompressor(); + DataInputBuffer gzbuf = new DataInputBuffer(); + decompStream = new DecompressorStream(gzbuf, decompressor); + gzbuf.reset(new byte[] { 0, 0, 1, 1, 1, 1, 11, 1, 1, 1, 1 }, 11); + decompStream.read(buffer); + } catch (IOException ioex) { + // expected + } catch (Exception ex) { + fail("invalid 0 and 1 byte in gzip stream" + ex); + } + + // invalid 2 byte, must be 8 + try { + int buffSize = 1 * 1024; + byte buffer[] = new byte[buffSize]; + Decompressor decompressor = new BuiltInGzipDecompressor(); + DataInputBuffer gzbuf = new DataInputBuffer(); + decompStream = new DecompressorStream(gzbuf, decompressor); + gzbuf.reset(new byte[] { 31, -117, 7, 1, 1, 1, 1, 11, 1, 1, 1, 1 }, 11); + decompStream.read(buffer); + } catch (IOException ioex) { + // expected + } catch (Exception ex) { + fail("invalid 2 byte in gzip stream" + ex); + } + + try { + int buffSize = 1 * 1024; + byte buffer[] = new byte[buffSize]; + Decompressor decompressor = new BuiltInGzipDecompressor(); + DataInputBuffer gzbuf = new DataInputBuffer(); + decompStream = new DecompressorStream(gzbuf, decompressor); + gzbuf.reset(new byte[] { 31, -117, 8, -32, 1, 1, 1, 11, 1, 1, 1, 1 }, 11); + decompStream.read(buffer); + } catch (IOException ioex) { + // expected + } catch (Exception ex) { + fail("invalid 3 byte in gzip stream" + ex); + } + try { + int buffSize = 1 * 1024; + byte buffer[] = new byte[buffSize]; + Decompressor decompressor = new BuiltInGzipDecompressor(); + DataInputBuffer gzbuf = new DataInputBuffer(); + decompStream = new DecompressorStream(gzbuf, decompressor); + gzbuf.reset(new byte[] { 31, -117, 8, 4, 1, 1, 1, 11, 1, 1, 1, 1 }, 11); + decompStream.read(buffer); + } catch (IOException ioex) { + // expected + } catch (Exception ex) { + fail("invalid 3 byte make hasExtraField" + ex); + } + } + + public static byte[] generate(int size) { + byte[] data = new byte[size]; + for (int i = 0; i < size; i++) + data[i] = (byte)random.nextInt(16); + return data; + } +}