mirror of https://github.com/apache/druid.git
CompressedObjectStrategy improvements
* Added more unit tests * Now properly uses safe / fast decompressor for LZ4 * Now chooses fastest lz4 instance instead of only looking at Java implmentations * Encapsulate ResourceHolder in try-with-resources to make sure they close correctly
This commit is contained in:
parent
ccc757dc64
commit
18f44beee9
|
@ -33,6 +33,7 @@ import net.jpountz.lz4.LZ4SafeDecompressor;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.Buffer;
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.Map;
|
||||
|
@ -163,14 +164,10 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
|||
@Override
|
||||
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
|
||||
{
|
||||
final int maxCopy = Math.min(numBytes, out.remaining());
|
||||
final ByteBuffer copyBuffer = in.duplicate();
|
||||
copyBuffer.limit(copyBuffer.position() + maxCopy);
|
||||
out.put(copyBuffer);
|
||||
|
||||
// Setup the buffers properly
|
||||
out.flip();
|
||||
in.position(in.position() + maxCopy);
|
||||
copyBuffer.limit(copyBuffer.position() + numBytes);
|
||||
out.put(copyBuffer).flip();
|
||||
in.position(in.position() + numBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -215,18 +212,20 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
|||
@Override
|
||||
public byte[] compress(byte[] bytes)
|
||||
{
|
||||
final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder();
|
||||
LZFChunk chunk = encoder.get().encodeChunk(bytes, 0, bytes.length);
|
||||
CloseQuietly.close(encoder);
|
||||
|
||||
return chunk.getData();
|
||||
try(final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder()) {
|
||||
LZFChunk chunk = encoder.get().encodeChunk(bytes, 0, bytes.length);
|
||||
return chunk.getData();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class LZ4Decompressor implements Decompressor
|
||||
{
|
||||
private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestJavaInstance().safeDecompressor();
|
||||
private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
|
||||
private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestInstance().safeDecompressor();
|
||||
private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestInstance().fastDecompressor();
|
||||
private static final LZ4Decompressor defaultDecompressor = new LZ4Decompressor();
|
||||
|
||||
@Override
|
||||
|
@ -237,7 +236,8 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
|||
|
||||
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
|
||||
final byte[] outputBytes = outputBytesHolder.get();
|
||||
final int numDecompressedBytes = lz4Fast.decompress(bytes, 0, outputBytes, 0, outputBytes.length);
|
||||
// Since decompressed size is NOT known, must use lz4Safe
|
||||
final int numDecompressedBytes = lz4Safe.decompress(bytes,outputBytes);
|
||||
out.put(outputBytes, 0, numDecompressedBytes);
|
||||
out.flip();
|
||||
}
|
||||
|
@ -269,8 +269,8 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
|||
public static class LZ4Compressor implements Compressor
|
||||
{
|
||||
private static final LZ4Compressor defaultCompressor = new LZ4Compressor();
|
||||
private static final net.jpountz.lz4.LZ4Compressor lz4Fast = LZ4Factory.fastestJavaInstance().fastCompressor();
|
||||
private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestJavaInstance().highCompressor();
|
||||
private static final net.jpountz.lz4.LZ4Compressor lz4Fast = LZ4Factory.fastestInstance().fastCompressor();
|
||||
private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestInstance().highCompressor();
|
||||
|
||||
@Override
|
||||
public byte[] compress(byte[] bytes)
|
||||
|
|
|
@ -21,10 +21,31 @@ package io.druid.segment.data;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Iterables;
|
||||
import io.druid.segment.CompressedPools;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class CompressionStrategyTest
|
||||
{
|
||||
@Parameterized.Parameters
|
||||
|
@ -49,4 +70,138 @@ public class CompressionStrategyTest
|
|||
{
|
||||
this.compressionStrategy = compressionStrategy;
|
||||
}
|
||||
|
||||
// MUST be smaller than CompressedPools.BUFFER_SIZE
|
||||
private static final int DATA_SIZER = 0xFFFF;
|
||||
private static byte[] originalData;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass()
|
||||
{
|
||||
originalData = new byte[DATA_SIZER];
|
||||
Random random = new Random(54671457);
|
||||
random.nextBytes(originalData);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
// NOOP
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicOperations()
|
||||
{
|
||||
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
|
||||
ByteBuffer output = ByteBuffer.allocate(originalData.length);
|
||||
compressionStrategy.getDecompressor().decompress(compressed, compressed.array().length, output);
|
||||
byte[] checkArray = new byte[DATA_SIZER];
|
||||
output.get(checkArray);
|
||||
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testOutputSizeKnownOperations()
|
||||
{
|
||||
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
|
||||
ByteBuffer output = ByteBuffer.allocate(originalData.length);
|
||||
compressionStrategy.getDecompressor()
|
||||
.decompress(compressed, compressed.array().length, output, originalData.length);
|
||||
byte[] checkArray = new byte[DATA_SIZER];
|
||||
output.get(checkArray);
|
||||
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectMemoryOperations()
|
||||
{
|
||||
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
|
||||
ByteBuffer output = ByteBuffer.allocateDirect(originalData.length);
|
||||
compressionStrategy.getDecompressor().decompress(compressed, compressed.array().length, output);
|
||||
byte[] checkArray = new byte[DATA_SIZER];
|
||||
output.get(checkArray);
|
||||
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConcurrency() throws InterruptedException, ExecutionException, TimeoutException
|
||||
{
|
||||
final int numThreads = 20;
|
||||
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(numThreads);
|
||||
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
|
||||
numThreads,
|
||||
numThreads,
|
||||
100,
|
||||
TimeUnit.MILLISECONDS,
|
||||
queue
|
||||
);
|
||||
Collection<Future<Boolean>> results = new ArrayList<>();
|
||||
for (int i = 0; i < numThreads; ++i) {
|
||||
results.add(
|
||||
threadPoolExecutor.submit(
|
||||
new Callable<Boolean>()
|
||||
{
|
||||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
|
||||
ByteBuffer output = ByteBuffer.allocate(originalData.length);
|
||||
compressionStrategy.getDecompressor().decompress(compressed, compressed.array().length, output);
|
||||
byte[] checkArray = new byte[DATA_SIZER];
|
||||
output.get(checkArray);
|
||||
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
threadPoolExecutor.shutdown();
|
||||
for (Future result : results) {
|
||||
Assert.assertTrue((Boolean) result.get(100, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testKnownSizeConcurrency() throws InterruptedException, ExecutionException, TimeoutException
|
||||
{
|
||||
final int numThreads = 20;
|
||||
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(numThreads);
|
||||
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
|
||||
numThreads,
|
||||
numThreads,
|
||||
100,
|
||||
TimeUnit.MILLISECONDS,
|
||||
queue
|
||||
);
|
||||
Collection<Future<Boolean>> results = new ArrayList<>();
|
||||
for (int i = 0; i < numThreads; ++i) {
|
||||
results.add(
|
||||
threadPoolExecutor.submit(
|
||||
new Callable<Boolean>()
|
||||
{
|
||||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
|
||||
ByteBuffer output = ByteBuffer.allocate(originalData.length);
|
||||
// TODO: Lambdas would be nice here whenever we use Java 8
|
||||
compressionStrategy.getDecompressor()
|
||||
.decompress(compressed, compressed.array().length, output, originalData.length);
|
||||
byte[] checkArray = new byte[DATA_SIZER];
|
||||
output.get(checkArray);
|
||||
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
threadPoolExecutor.shutdown();
|
||||
for (Future result : results) {
|
||||
Assert.assertTrue((Boolean) result.get(500, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue