HADOOP-9225. Cover package org.apache.hadoop.compress.Snappy. Contributed by Vadim Bondarev, Andrey Klochkov and Nathan Roberts

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529296 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-10-04 20:56:21 +00:00
parent 3c1f18de43
commit 8549c34917
2 changed files with 345 additions and 0 deletions

View File

@ -345,6 +345,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-9254. Cover packages org.apache.hadoop.util.bloom,
org.apache.hadoop.util.hash (Vadim Bondarev via jlowe)
HADOOP-9225. Cover package org.apache.hadoop.compress.Snappy (Vadim
Bondarev, Andrey Klochkov and Nathan Roberts via jlowe)
OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)

View File

@ -0,0 +1,342 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.snappy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.Random;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assume.*;
public class TestSnappyCompressorDecompressor {
@Before
public void before() {
assumeTrue(SnappyCodec.isNativeCodeLoaded());
}
@Test
public void testSnappyCompressorSetInputNullPointerException() {
try {
SnappyCompressor compressor = new SnappyCompressor();
compressor.setInput(null, 0, 10);
fail("testSnappyCompressorSetInputNullPointerException error !!!");
} catch (NullPointerException ex) {
// excepted
} catch (Exception ex) {
fail("testSnappyCompressorSetInputNullPointerException ex error !!!");
}
}
@Test
public void testSnappyDecompressorSetInputNullPointerException() {
try {
SnappyDecompressor decompressor = new SnappyDecompressor();
decompressor.setInput(null, 0, 10);
fail("testSnappyDecompressorSetInputNullPointerException error !!!");
} catch (NullPointerException ex) {
// expected
} catch (Exception e) {
fail("testSnappyDecompressorSetInputNullPointerException ex error !!!");
}
}
@Test
public void testSnappyCompressorSetInputAIOBException() {
try {
SnappyCompressor compressor = new SnappyCompressor();
compressor.setInput(new byte[] {}, -5, 10);
fail("testSnappyCompressorSetInputAIOBException error !!!");
} catch (ArrayIndexOutOfBoundsException ex) {
// expected
} catch (Exception ex) {
fail("testSnappyCompressorSetInputAIOBException ex error !!!");
}
}
@Test
public void testSnappyDecompressorSetInputAIOUBException() {
try {
SnappyDecompressor decompressor = new SnappyDecompressor();
decompressor.setInput(new byte[] {}, -5, 10);
fail("testSnappyDecompressorSetInputAIOUBException error !!!");
} catch (ArrayIndexOutOfBoundsException ex) {
// expected
} catch (Exception e) {
fail("testSnappyDecompressorSetInputAIOUBException ex error !!!");
}
}
@Test
public void testSnappyCompressorCompressNullPointerException() {
try {
SnappyCompressor compressor = new SnappyCompressor();
byte[] bytes = BytesGenerator.get(1024 * 6);
compressor.setInput(bytes, 0, bytes.length);
compressor.compress(null, 0, 0);
fail("testSnappyCompressorCompressNullPointerException error !!!");
} catch (NullPointerException ex) {
// expected
} catch (Exception e) {
fail("testSnappyCompressorCompressNullPointerException ex error !!!");
}
}
@Test
public void testSnappyDecompressorCompressNullPointerException() {
try {
SnappyDecompressor decompressor = new SnappyDecompressor();
byte[] bytes = BytesGenerator.get(1024 * 6);
decompressor.setInput(bytes, 0, bytes.length);
decompressor.decompress(null, 0, 0);
fail("testSnappyDecompressorCompressNullPointerException error !!!");
} catch (NullPointerException ex) {
// expected
} catch (Exception e) {
fail("testSnappyDecompressorCompressNullPointerException ex error !!!");
}
}
@Test
public void testSnappyCompressorCompressAIOBException() {
try {
SnappyCompressor compressor = new SnappyCompressor();
byte[] bytes = BytesGenerator.get(1024 * 6);
compressor.setInput(bytes, 0, bytes.length);
compressor.compress(new byte[] {}, 0, -1);
fail("testSnappyCompressorCompressAIOBException error !!!");
} catch (ArrayIndexOutOfBoundsException ex) {
// expected
} catch (Exception e) {
fail("testSnappyCompressorCompressAIOBException ex error !!!");
}
}
@Test
public void testSnappyDecompressorCompressAIOBException() {
try {
SnappyDecompressor decompressor = new SnappyDecompressor();
byte[] bytes = BytesGenerator.get(1024 * 6);
decompressor.setInput(bytes, 0, bytes.length);
decompressor.decompress(new byte[] {}, 0, -1);
fail("testSnappyDecompressorCompressAIOBException error !!!");
} catch (ArrayIndexOutOfBoundsException ex) {
// expected
} catch (Exception e) {
fail("testSnappyDecompressorCompressAIOBException ex error !!!");
}
}
@Test
public void testSnappyCompressDecompress() {
int BYTE_SIZE = 1024 * 54;
byte[] bytes = BytesGenerator.get(BYTE_SIZE);
SnappyCompressor compressor = new SnappyCompressor();
try {
compressor.setInput(bytes, 0, bytes.length);
assertTrue("SnappyCompressDecompress getBytesRead error !!!",
compressor.getBytesRead() > 0);
assertTrue(
"SnappyCompressDecompress getBytesWritten before compress error !!!",
compressor.getBytesWritten() == 0);
byte[] compressed = new byte[BYTE_SIZE];
int cSize = compressor.compress(compressed, 0, compressed.length);
assertTrue(
"SnappyCompressDecompress getBytesWritten after compress error !!!",
compressor.getBytesWritten() > 0);
SnappyDecompressor decompressor = new SnappyDecompressor(BYTE_SIZE);
// set as input for decompressor only compressed data indicated with cSize
decompressor.setInput(compressed, 0, cSize);
byte[] decompressed = new byte[BYTE_SIZE];
decompressor.decompress(decompressed, 0, decompressed.length);
assertTrue("testSnappyCompressDecompress finished error !!!",
decompressor.finished());
Assert.assertArrayEquals(bytes, decompressed);
compressor.reset();
decompressor.reset();
assertTrue("decompressor getRemaining error !!!",
decompressor.getRemaining() == 0);
} catch (Exception e) {
fail("testSnappyCompressDecompress ex error!!!");
}
}
@Test
public void testCompressorDecompressorEmptyStreamLogic() {
ByteArrayInputStream bytesIn = null;
ByteArrayOutputStream bytesOut = null;
byte[] buf = null;
BlockDecompressorStream blockDecompressorStream = null;
try {
// compress empty stream
bytesOut = new ByteArrayOutputStream();
BlockCompressorStream blockCompressorStream = new BlockCompressorStream(
bytesOut, new SnappyCompressor(), 1024, 0);
// close without write
blockCompressorStream.close();
// check compressed output
buf = bytesOut.toByteArray();
assertEquals("empty stream compressed output size != 4", 4, buf.length);
// use compressed output as input for decompression
bytesIn = new ByteArrayInputStream(buf);
// create decompression stream
blockDecompressorStream = new BlockDecompressorStream(bytesIn,
new SnappyDecompressor(), 1024);
// no byte is available because stream was closed
assertEquals("return value is not -1", -1, blockDecompressorStream.read());
} catch (Exception e) {
fail("testCompressorDecompressorEmptyStreamLogic ex error !!!"
+ e.getMessage());
} finally {
if (blockDecompressorStream != null)
try {
bytesIn.close();
bytesOut.close();
blockDecompressorStream.close();
} catch (IOException e) {
}
}
}
@Test
public void testSnappyBlockCompression() {
int BYTE_SIZE = 1024 * 50;
int BLOCK_SIZE = 512;
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] block = new byte[BLOCK_SIZE];
byte[] bytes = BytesGenerator.get(BYTE_SIZE);
try {
// Use default of 512 as bufferSize and compressionOverhead of
// (1% of bufferSize + 12 bytes) = 18 bytes (zlib algorithm).
SnappyCompressor compressor = new SnappyCompressor();
int off = 0;
int len = BYTE_SIZE;
int maxSize = BLOCK_SIZE - 18;
if (BYTE_SIZE > maxSize) {
do {
int bufLen = Math.min(len, maxSize);
compressor.setInput(bytes, off, bufLen);
compressor.finish();
while (!compressor.finished()) {
compressor.compress(block, 0, block.length);
out.write(block);
}
compressor.reset();
off += bufLen;
len -= bufLen;
} while (len > 0);
}
assertTrue("testSnappyBlockCompression error !!!",
out.toByteArray().length > 0);
} catch (Exception ex) {
fail("testSnappyBlockCompression ex error !!!");
}
}
@Test
public void testSnappyCompressorDecopressorLogicWithCompressionStreams() {
int BYTE_SIZE = 1024 * 100;
byte[] bytes = BytesGenerator.get(BYTE_SIZE);
int bufferSize = 262144;
int compressionOverhead = (bufferSize / 6) + 32;
DataOutputStream deflateOut = null;
DataInputStream inflateIn = null;
try {
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
CompressionOutputStream deflateFilter = new BlockCompressorStream(
compressedDataBuffer, new SnappyCompressor(bufferSize), bufferSize,
compressionOverhead);
deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
deflateOut.write(bytes, 0, bytes.length);
deflateOut.flush();
deflateFilter.finish();
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
CompressionInputStream inflateFilter = new BlockDecompressorStream(
deCompressedDataBuffer, new SnappyDecompressor(bufferSize),
bufferSize);
inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
byte[] result = new byte[BYTE_SIZE];
inflateIn.read(result);
Assert.assertArrayEquals(
"original array not equals compress/decompressed array", result,
bytes);
} catch (IOException e) {
fail("testSnappyCompressorDecopressorLogicWithCompressionStreams ex error !!!");
} finally {
try {
if (deflateOut != null)
deflateOut.close();
if (inflateIn != null)
inflateIn.close();
} catch (Exception e) {
}
}
}
static final class BytesGenerator {
private BytesGenerator() {
}
private static final byte[] CACHE = new byte[] { 0x0, 0x1, 0x2, 0x3, 0x4,
0x5, 0x6, 0x7, 0x8, 0x9, 0xA, 0xB, 0xC, 0xD, 0xE, 0xF };
private static final Random rnd = new Random(12345l);
public static byte[] get(int size) {
byte[] array = (byte[]) Array.newInstance(byte.class, size);
for (int i = 0; i < size; i++)
array[i] = CACHE[rnd.nextInt(CACHE.length - 1)];
return array;
}
}
}