diff --git a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java index 0796b60f66b..0ea2d86cda9 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java @@ -49,13 +49,13 @@ public class CompressedObjectStrategy implements ObjectStrateg @Override public Decompressor getDecompressor() { - return new LZFDecompressor(); + return LZFDecompressor.defaultDecompressor; } @Override public Compressor getCompressor() { - return new LZFCompressor(); + return LZFCompressor.defaultCompressor; } }, @@ -63,15 +63,26 @@ public class CompressedObjectStrategy implements ObjectStrateg @Override public Decompressor getDecompressor() { - return new LZ4Decompressor(); + return LZ4Decompressor.defaultDecompressor; } @Override public Compressor getCompressor() { - return new LZ4Compressor(); + return LZ4Compressor.defaultCompressor; } - }; + }, + UNCOMPRESSED((byte)0x2){ + @Override + public Decompressor getDecompressor(){ + return UncompressedDecompressor.defaultDecompressor; + } + @Override + public Compressor getCompressor(){ + return UncompressedCompressor.defaultCompressor; + } + } + ; final byte id; @@ -120,9 +131,35 @@ public class CompressedObjectStrategy implements ObjectStrateg */ public byte[] compress(byte[] bytes); } + public static class UncompressedCompressor implements Compressor{ + private static final UncompressedCompressor defaultCompressor = new UncompressedCompressor(); + @Override + public byte[] compress(byte[] bytes) { + return bytes; + } + } + public static class UncompressedDecompressor implements Decompressor{ + private static final UncompressedDecompressor defaultDecompressor = new UncompressedDecompressor(); + @Override + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { + final int maxCopy = Math.min(numBytes, out.remaining()); + final ByteBuffer copyBuffer = in.duplicate(); + copyBuffer.limit(copyBuffer.position() + maxCopy); + out.put(copyBuffer); + + // Setup the buffers properly + out.flip(); + in.position(in.position() + maxCopy); + } + @Override + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize) { + decompress(in, numBytes, out); + } + } public static class LZFDecompressor implements Decompressor { + private static final LZFDecompressor defaultDecompressor = new LZFDecompressor(); @Override public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { @@ -149,6 +186,7 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZFCompressor implements Compressor { + private static final LZFCompressor defaultCompressor = new LZFCompressor(); @Override public byte[] compress(byte[] bytes) { @@ -162,9 +200,9 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZ4Decompressor implements Decompressor { - private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor(); - private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); - + private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestJavaInstance().safeDecompressor(); + private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); + private static final LZ4Decompressor defaultDecompressor = new LZ4Decompressor(); @Override public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { @@ -173,8 +211,7 @@ public class CompressedObjectStrategy implements ObjectStrateg try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { final byte[] outputBytes = outputBytesHolder.get(); - final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length); - + final int numDecompressedBytes = lz4Fast.decompress(bytes, 0, outputBytes, 0, outputBytes.length); out.put(outputBytes, 0, numDecompressedBytes); out.flip(); } @@ -189,6 +226,7 @@ public class CompressedObjectStrategy implements ObjectStrateg final byte[] bytes = new byte[numBytes]; in.get(bytes); + // TODO: Upgrade this to ByteBuffer once https://github.com/jpountz/lz4-java/issues/9 is in mainline code for lz4-java try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { final byte[] outputBytes = outputBytesHolder.get(); lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize); @@ -204,16 +242,14 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZ4Compressor implements Compressor { - private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor(); + private static final LZ4Compressor defaultCompressor = new LZ4Compressor(); + private static final net.jpountz.lz4.LZ4Compressor lz4Fast = LZ4Factory.fastestJavaInstance().fastCompressor(); + private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestJavaInstance().highCompressor(); @Override public byte[] compress(byte[] bytes) { - final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)]; - final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length); - final byte[] out = new byte[outputBytes]; - System.arraycopy(intermediate, 0, out, 0, outputBytes); - return out; + return lz4High.compress(bytes); } }