mirror of https://github.com/apache/druid.git
Updates to CompressedObjectStrategy to support more compression types
* Compression types are not yet dynamically configurable. * Added a benchmarking system for topN to test the compression * Updated pom.xml to include junit benchmarking * added an Uncompressed option
This commit is contained in:
parent
2336e6c167
commit
8bd6cf0d07
|
@ -49,13 +49,13 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
||||||
@Override
|
@Override
|
||||||
public Decompressor getDecompressor()
|
public Decompressor getDecompressor()
|
||||||
{
|
{
|
||||||
return new LZFDecompressor();
|
return LZFDecompressor.defaultDecompressor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Compressor getCompressor()
|
public Compressor getCompressor()
|
||||||
{
|
{
|
||||||
return new LZFCompressor();
|
return LZFCompressor.defaultCompressor;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -63,15 +63,26 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
||||||
@Override
|
@Override
|
||||||
public Decompressor getDecompressor()
|
public Decompressor getDecompressor()
|
||||||
{
|
{
|
||||||
return new LZ4Decompressor();
|
return LZ4Decompressor.defaultDecompressor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Compressor getCompressor()
|
public Compressor getCompressor()
|
||||||
{
|
{
|
||||||
return new LZ4Compressor();
|
return LZ4Compressor.defaultCompressor;
|
||||||
}
|
}
|
||||||
};
|
},
|
||||||
|
UNCOMPRESSED((byte)0x2){
|
||||||
|
@Override
|
||||||
|
public Decompressor getDecompressor(){
|
||||||
|
return UncompressedDecompressor.defaultDecompressor;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public Compressor getCompressor(){
|
||||||
|
return UncompressedCompressor.defaultCompressor;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
;
|
||||||
|
|
||||||
final byte id;
|
final byte id;
|
||||||
|
|
||||||
|
@ -120,9 +131,35 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
||||||
*/
|
*/
|
||||||
public byte[] compress(byte[] bytes);
|
public byte[] compress(byte[] bytes);
|
||||||
}
|
}
|
||||||
|
public static class UncompressedCompressor implements Compressor{
|
||||||
|
private static final UncompressedCompressor defaultCompressor = new UncompressedCompressor();
|
||||||
|
@Override
|
||||||
|
public byte[] compress(byte[] bytes) {
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static class UncompressedDecompressor implements Decompressor{
|
||||||
|
private static final UncompressedDecompressor defaultDecompressor = new UncompressedDecompressor();
|
||||||
|
@Override
|
||||||
|
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) {
|
||||||
|
final int maxCopy = Math.min(numBytes, out.remaining());
|
||||||
|
final ByteBuffer copyBuffer = in.duplicate();
|
||||||
|
copyBuffer.limit(copyBuffer.position() + maxCopy);
|
||||||
|
out.put(copyBuffer);
|
||||||
|
|
||||||
|
// Setup the buffers properly
|
||||||
|
out.flip();
|
||||||
|
in.position(in.position() + maxCopy);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize) {
|
||||||
|
decompress(in, numBytes, out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class LZFDecompressor implements Decompressor
|
public static class LZFDecompressor implements Decompressor
|
||||||
{
|
{
|
||||||
|
private static final LZFDecompressor defaultDecompressor = new LZFDecompressor();
|
||||||
@Override
|
@Override
|
||||||
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
|
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
|
||||||
{
|
{
|
||||||
|
@ -149,6 +186,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
||||||
|
|
||||||
public static class LZFCompressor implements Compressor
|
public static class LZFCompressor implements Compressor
|
||||||
{
|
{
|
||||||
|
private static final LZFCompressor defaultCompressor = new LZFCompressor();
|
||||||
@Override
|
@Override
|
||||||
public byte[] compress(byte[] bytes)
|
public byte[] compress(byte[] bytes)
|
||||||
{
|
{
|
||||||
|
@ -162,9 +200,9 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
||||||
|
|
||||||
public static class LZ4Decompressor implements Decompressor
|
public static class LZ4Decompressor implements Decompressor
|
||||||
{
|
{
|
||||||
private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor();
|
private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestJavaInstance().safeDecompressor();
|
||||||
private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
|
private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
|
||||||
|
private static final LZ4Decompressor defaultDecompressor = new LZ4Decompressor();
|
||||||
@Override
|
@Override
|
||||||
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
|
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
|
||||||
{
|
{
|
||||||
|
@ -173,8 +211,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
||||||
|
|
||||||
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
|
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
|
||||||
final byte[] outputBytes = outputBytesHolder.get();
|
final byte[] outputBytes = outputBytesHolder.get();
|
||||||
final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length);
|
final int numDecompressedBytes = lz4Fast.decompress(bytes, 0, outputBytes, 0, outputBytes.length);
|
||||||
|
|
||||||
out.put(outputBytes, 0, numDecompressedBytes);
|
out.put(outputBytes, 0, numDecompressedBytes);
|
||||||
out.flip();
|
out.flip();
|
||||||
}
|
}
|
||||||
|
@ -189,6 +226,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
||||||
final byte[] bytes = new byte[numBytes];
|
final byte[] bytes = new byte[numBytes];
|
||||||
in.get(bytes);
|
in.get(bytes);
|
||||||
|
|
||||||
|
// TODO: Upgrade this to ByteBuffer once https://github.com/jpountz/lz4-java/issues/9 is in mainline code for lz4-java
|
||||||
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
|
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
|
||||||
final byte[] outputBytes = outputBytesHolder.get();
|
final byte[] outputBytes = outputBytesHolder.get();
|
||||||
lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize);
|
lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize);
|
||||||
|
@ -204,16 +242,14 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
||||||
|
|
||||||
public static class LZ4Compressor implements Compressor
|
public static class LZ4Compressor implements Compressor
|
||||||
{
|
{
|
||||||
private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor();
|
private static final LZ4Compressor defaultCompressor = new LZ4Compressor();
|
||||||
|
private static final net.jpountz.lz4.LZ4Compressor lz4Fast = LZ4Factory.fastestJavaInstance().fastCompressor();
|
||||||
|
private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestJavaInstance().highCompressor();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] compress(byte[] bytes)
|
public byte[] compress(byte[] bytes)
|
||||||
{
|
{
|
||||||
final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)];
|
return lz4High.compress(bytes);
|
||||||
final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length);
|
|
||||||
final byte[] out = new byte[outputBytes];
|
|
||||||
System.arraycopy(intermediate, 0, out, 0, outputBytes);
|
|
||||||
return out;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue