Merge pull request #850 from metamx/druid-0.7.x-compressionstrategy

Compression strategy changes
This commit is contained in:
xvrl 2014-11-25 12:58:39 -08:00
commit 5bc1be5ba0
2 changed files with 254 additions and 39 deletions

View File

@ -22,6 +22,7 @@ package io.druid.segment.data;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.LZFDecoder;
@ -33,49 +34,65 @@ import net.jpountz.lz4.LZ4SafeDecompressor;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
/**
*/
*/
public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrategy<ResourceHolder<T>>
{
private static final Logger log = new Logger(CompressedObjectStrategy.class);
public static final CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = CompressionStrategy.LZ4;
public static enum CompressionStrategy {
LZF ((byte)0x0)
{
@Override
public Decompressor getDecompressor()
{
return new LZFDecompressor();
}
@Override
public Compressor getCompressor()
{
return new LZFCompressor();
}
},
LZ4 ((byte)0x1) {
public static enum CompressionStrategy
{
LZF((byte) 0x0) {
@Override
public Decompressor getDecompressor()
{
return new LZ4Decompressor();
return LZFDecompressor.defaultDecompressor;
}
@Override
public Compressor getCompressor()
{
return new LZ4Compressor();
return LZFCompressor.defaultCompressor;
}
},
LZ4((byte) 0x1) {
@Override
public Decompressor getDecompressor()
{
return LZ4Decompressor.defaultDecompressor;
}
@Override
public Compressor getCompressor()
{
return LZ4Compressor.defaultCompressor;
}
},
UNCOMPRESSED((byte) 0xFF) {
@Override
public Decompressor getDecompressor()
{
return UncompressedDecompressor.defaultDecompressor;
}
@Override
public Compressor getCompressor()
{
return UncompressedCompressor.defaultCompressor;
}
};
final byte id;
CompressionStrategy(byte id) {
CompressionStrategy(byte id)
{
this.id = id;
}
@ -83,12 +100,17 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
{
return id;
}
public abstract Compressor getCompressor();
public abstract Decompressor getDecompressor();
static final Map<Byte, CompressionStrategy> idMap = Maps.newHashMap();
static {
for(CompressionStrategy strategy : CompressionStrategy.values()) idMap.put(strategy.getId(), strategy);
for (CompressionStrategy strategy : CompressionStrategy.values()) {
idMap.put(strategy.getId(), strategy);
}
}
public static CompressionStrategy forId(byte id)
@ -107,6 +129,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
* @param out
*/
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out);
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize);
}
@ -116,13 +139,47 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
* Currently assumes buf is an array backed ByteBuffer
*
* @param bytes
*
* @return
*/
public byte[] compress(byte[] bytes);
}
public static class UncompressedCompressor implements Compressor
{
private static final UncompressedCompressor defaultCompressor = new UncompressedCompressor();
@Override
public byte[] compress(byte[] bytes)
{
return bytes;
}
}
public static class UncompressedDecompressor implements Decompressor
{
private static final UncompressedDecompressor defaultDecompressor = new UncompressedDecompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
final ByteBuffer copyBuffer = in.duplicate();
copyBuffer.limit(copyBuffer.position() + numBytes);
out.put(copyBuffer).flip();
in.position(in.position() + numBytes);
}
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize)
{
decompress(in, numBytes, out);
}
}
public static class LZFDecompressor implements Decompressor
{
private static final LZFDecompressor defaultDecompressor = new LZFDecompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
@ -136,7 +193,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
out.flip();
}
catch (IOException e) {
Throwables.propagate(e);
log.error(e, "IOException thrown while closing ChunkEncoder.");
}
}
@ -149,21 +206,29 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
public static class LZFCompressor implements Compressor
{
private static final LZFCompressor defaultCompressor = new LZFCompressor();
@Override
public byte[] compress(byte[] bytes)
{
final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder();
LZFChunk chunk = encoder.get().encodeChunk(bytes, 0, bytes.length);
CloseQuietly.close(encoder);
return chunk.getData();
try (final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder()) {
final LZFChunk chunk = encoder.get().encodeChunk(bytes, 0, bytes.length);
return chunk.getData();
}
catch (IOException e) {
log.error(e, "IOException thrown while closing ChunkEncoder.");
}
// IOException should be on ResourceHolder.close(), not encodeChunk, so this *should* never happen
return null;
}
}
public static class LZ4Decompressor implements Decompressor
{
private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor();
private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestInstance().safeDecompressor();
private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestInstance().fastDecompressor();
private static final LZ4Decompressor defaultDecompressor = new LZ4Decompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
@ -173,13 +238,13 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length);
// Since decompressed size is NOT known, must use lz4Safe
final int numDecompressedBytes = lz4Safe.decompress(bytes, outputBytes);
out.put(outputBytes, 0, numDecompressedBytes);
out.flip();
}
catch (IOException e) {
Throwables.propagate(e);
log.error(e, "IOException thrown while closing ChunkEncoder.");
}
}
@ -189,6 +254,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
final byte[] bytes = new byte[numBytes];
in.get(bytes);
// TODO: Upgrade this to ByteBuffer once https://github.com/jpountz/lz4-java/issues/9 is in mainline code for lz4-java
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize);
@ -197,23 +263,20 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
out.flip();
}
catch (IOException e) {
Throwables.propagate(e);
log.error(e, "IOException thrown while closing ChunkEncoder.");
}
}
}
public static class LZ4Compressor implements Compressor
{
private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor();
private static final LZ4Compressor defaultCompressor = new LZ4Compressor();
private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestInstance().highCompressor();
@Override
public byte[] compress(byte[] bytes)
{
final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)];
final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length);
final byte[] out = new byte[outputBytes];
System.arraycopy(intermediate, 0, out, 0, outputBytes);
return out;
return lz4High.compress(bytes);
}
}
@ -298,8 +361,11 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
public static interface BufferConverter<T>
{
public T convert(ByteBuffer buf);
public int compare(T lhs, T rhs);
public int sizeOf(int count);
public T combine(ByteBuffer into, T from);
}
}

View File

@ -21,10 +21,31 @@ package io.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import io.druid.segment.CompressedPools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class)
public class CompressionStrategyTest
{
@Parameterized.Parameters
@ -49,4 +70,132 @@ public class CompressionStrategyTest
{
this.compressionStrategy = compressionStrategy;
}
// MUST be smaller than CompressedPools.BUFFER_SIZE
private static final int DATA_SIZER = 0xFFFF;
private static byte[] originalData;
@BeforeClass
public static void setupClass()
{
originalData = new byte[DATA_SIZER];
Random random = new Random(54671457);
random.nextBytes(originalData);
}
@Test
public void testBasicOperations()
{
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
ByteBuffer output = ByteBuffer.allocate(originalData.length);
compressionStrategy.getDecompressor().decompress(compressed, compressed.array().length, output);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
}
@Test
public void testOutputSizeKnownOperations()
{
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
ByteBuffer output = ByteBuffer.allocate(originalData.length);
compressionStrategy.getDecompressor()
.decompress(compressed, compressed.array().length, output, originalData.length);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
}
@Test
public void testDirectMemoryOperations()
{
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
ByteBuffer output = ByteBuffer.allocateDirect(originalData.length);
compressionStrategy.getDecompressor().decompress(compressed, compressed.array().length, output);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
}
@Test
public void testConcurrency() throws InterruptedException, ExecutionException, TimeoutException
{
final int numThreads = 20;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(numThreads);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
numThreads,
numThreads,
100,
TimeUnit.MILLISECONDS,
queue
);
Collection<Future<Boolean>> results = new ArrayList<>();
for (int i = 0; i < numThreads; ++i) {
results.add(
threadPoolExecutor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
ByteBuffer output = ByteBuffer.allocate(originalData.length);
compressionStrategy.getDecompressor().decompress(compressed, compressed.array().length, output);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
return true;
}
}
)
);
}
threadPoolExecutor.shutdown();
for (Future result : results) {
Assert.assertTrue((Boolean) result.get(100, TimeUnit.MILLISECONDS));
}
}
@Test
public void testKnownSizeConcurrency() throws InterruptedException, ExecutionException, TimeoutException
{
final int numThreads = 20;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(numThreads);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
numThreads,
numThreads,
100,
TimeUnit.MILLISECONDS,
queue
);
Collection<Future<Boolean>> results = new ArrayList<>();
for (int i = 0; i < numThreads; ++i) {
results.add(
threadPoolExecutor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
ByteBuffer output = ByteBuffer.allocate(originalData.length);
// TODO: Lambdas would be nice here whenever we use Java 8
compressionStrategy.getDecompressor()
.decompress(compressed, compressed.array().length, output, originalData.length);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
return true;
}
}
)
);
}
threadPoolExecutor.shutdown();
for (Future result : results) {
Assert.assertTrue((Boolean) result.get(500, TimeUnit.MILLISECONDS));
}
}
}