From c40a315c819ac472605837c6d26d8158bc01561b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 8 May 2014 18:06:26 -0700 Subject: [PATCH 1/3] initial support for LZ4 compression --- pom.xml | 2 +- processing/pom.xml | 4 + .../segment/FloatMetricColumnSerializer.java | 4 +- .../java/io/druid/segment/IndexMerger.java | 4 +- .../CompressedFloatBufferObjectStrategy.java | 20 +- .../data/CompressedFloatsIndexedSupplier.java | 61 ++++- .../CompressedFloatsSupplierSerializer.java | 17 +- .../CompressedLongBufferObjectStrategy.java | 19 +- .../data/CompressedLongsIndexedSupplier.java | 47 +++- .../CompressedLongsSupplierSerializer.java | 16 +- .../data/CompressedObjectStrategy.java | 254 +++++++++++++++--- .../FixedSizeCompressedObjectStrategy.java | 50 ++++ .../data/InMemoryCompressedFloats.java | 10 +- .../segment/data/InMemoryCompressedLongs.java | 10 +- .../CompressedFloatsIndexedSupplierTest.java | 5 +- ...ompressedFloatsSupplierSerializerTest.java | 12 +- .../CompressedLongsIndexedSupplierTest.java | 5 +- ...CompressedLongsSupplierSerializerTest.java | 8 +- 18 files changed, 447 insertions(+), 101 deletions(-) create mode 100644 processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java diff --git a/pom.xml b/pom.xml index 2b7f88276e8..642a0cf6887 100644 --- a/pom.xml +++ b/pom.xml @@ -379,7 +379,7 @@ net.jpountz.lz4 lz4 - 1.1.2 + 1.2.0 com.google.protobuf diff --git a/processing/pom.xml b/processing/pom.xml index 755d2f553f7..31ac9652129 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -82,6 +82,10 @@ com.davekoelle alphanum + + net.jpountz.lz4 + lz4 + diff --git a/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java b/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java index 4caa6b95fe2..520708fe015 100644 --- a/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java +++ b/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java @@ -21,6 +21,7 @@ package io.druid.segment; import com.google.common.io.Files; import io.druid.segment.data.CompressedFloatsSupplierSerializer; +import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.IOPeon; import java.io.File; @@ -51,7 +52,8 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer public void open() throws IOException { writer = CompressedFloatsSupplierSerializer.create( - ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER + ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER, + CompressedObjectStrategy.CompressionStrategy.LZ4 // TODO define this somewhere else ); writer.open(); diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 99f854da9af..87cfa25a2ed 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -54,6 +54,7 @@ import io.druid.query.aggregation.ToLowerCaseAggregatorFactory; import io.druid.segment.column.ColumnConfig; import io.druid.segment.data.ByteBufferWriter; import io.druid.segment.data.CompressedLongsSupplierSerializer; +import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.ConciseCompressedIndexedInts; import io.druid.segment.data.GenericIndexed; import io.druid.segment.data.GenericIndexedWriter; @@ -594,7 +595,8 @@ public class IndexMerger Iterable theRows = rowMergerFn.apply(boats); CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create( - ioPeon, "little_end_time", IndexIO.BYTE_ORDER + ioPeon, "little_end_time", IndexIO.BYTE_ORDER, + CompressedObjectStrategy.CompressionStrategy.LZ4 // TODO define this somewhere else ); timeWriter.open(); diff --git a/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java index 7a9906cd364..e6294462afe 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java @@ -28,14 +28,16 @@ import java.nio.FloatBuffer; /** */ -public class CompressedFloatBufferObjectStrategy extends CompressedObjectStrategy +public class CompressedFloatBufferObjectStrategy extends FixedSizeCompressedObjectStrategy { - public static CompressedFloatBufferObjectStrategy getBufferForOrder(ByteOrder order) + public static CompressedFloatBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer) { - return new CompressedFloatBufferObjectStrategy(order); + return new CompressedFloatBufferObjectStrategy(order, compression, sizePer); } - private CompressedFloatBufferObjectStrategy(final ByteOrder order) + private final int sizePer; + + private CompressedFloatBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer) { super( order, @@ -64,7 +66,15 @@ public class CompressedFloatBufferObjectStrategy extends CompressedObjectStrateg { return into.asFloatBuffer().put(from); } - } + }, + compression ); + this.sizePer = sizePer; + } + + @Override + public int getSize() + { + return sizePer; } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java index 1beccc5426f..35a3b03f0b2 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java @@ -40,22 +40,26 @@ import java.util.Iterator; */ public class CompressedFloatsIndexedSupplier implements Supplier { - public static final byte version = 0x1; + public static final byte LZF_VERSION = 0x1; + public static final byte version = 0x2; public static final int MAX_FLOATS_IN_BUFFER = (0xFFFF >> 2); private final int totalSize; private final int sizePer; private final GenericIndexed> baseFloatBuffers; + private final CompressedObjectStrategy.CompressionStrategy compression; CompressedFloatsIndexedSupplier( int totalSize, int sizePer, - GenericIndexed> baseFloatBuffers + GenericIndexed> baseFloatBuffers, + CompressedObjectStrategy.CompressionStrategy compression ) { this.totalSize = totalSize; this.sizePer = sizePer; this.baseFloatBuffers = baseFloatBuffers; + this.compression = compression; } public int size() @@ -151,7 +155,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier public long getSerializedSize() { - return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4; + return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4 + 1; } public void writeToChannel(WritableByteChannel channel) throws IOException @@ -159,6 +163,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier channel.write(ByteBuffer.wrap(new byte[]{version})); channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize))); channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer))); + channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); baseFloatBuffers.writeToChannel(channel); } @@ -167,7 +172,8 @@ public class CompressedFloatsIndexedSupplier implements Supplier return new CompressedFloatsIndexedSupplier( totalSize, sizePer, - GenericIndexed.fromIterable(baseFloatBuffers, CompressedFloatBufferObjectStrategy.getBufferForOrder(order)) + GenericIndexed.fromIterable(baseFloatBuffers, CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)), + compression ); } @@ -191,23 +197,53 @@ public class CompressedFloatsIndexedSupplier implements Supplier byte versionFromBuffer = buffer.get(); if (versionFromBuffer == version) { + final int totalSize = buffer.getInt(); + final int sizePer = buffer.getInt(); + final CompressedObjectStrategy.CompressionStrategy compression = + CompressedObjectStrategy.CompressionStrategy.forId(buffer.get()); + return new CompressedFloatsIndexedSupplier( - buffer.getInt(), - buffer.getInt(), - GenericIndexed.read(buffer, CompressedFloatBufferObjectStrategy.getBufferForOrder(order)) + totalSize, + sizePer, + GenericIndexed.read( + buffer, + CompressedFloatBufferObjectStrategy.getBufferForOrder( + order, + compression, + sizePer + ) + ), + compression + ); + } else if (versionFromBuffer == LZF_VERSION) { + final int totalSize = buffer.getInt(); + final int sizePer = buffer.getInt(); + final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.LZF; + return new CompressedFloatsIndexedSupplier( + totalSize, + sizePer, + GenericIndexed.read( + buffer, + CompressedFloatBufferObjectStrategy.getBufferForOrder( + order, + compression, + sizePer + ) + ), + compression ); } throw new IAE("Unknown version[%s]", versionFromBuffer); } - public static CompressedFloatsIndexedSupplier fromFloatBuffer(FloatBuffer buffer, final ByteOrder order) + public static CompressedFloatsIndexedSupplier fromFloatBuffer(FloatBuffer buffer, final ByteOrder order, CompressedObjectStrategy.CompressionStrategy compression) { - return fromFloatBuffer(buffer, MAX_FLOATS_IN_BUFFER, order); + return fromFloatBuffer(buffer, MAX_FLOATS_IN_BUFFER, order, compression); } public static CompressedFloatsIndexedSupplier fromFloatBuffer( - final FloatBuffer buffer, final int chunkFactor, final ByteOrder order + final FloatBuffer buffer, final int chunkFactor, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression ) { Preconditions.checkArgument( @@ -254,8 +290,9 @@ public class CompressedFloatsIndexedSupplier implements Supplier }; } }, - CompressedFloatBufferObjectStrategy.getBufferForOrder(order) - ) + CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, chunkFactor) + ), + compression ); } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedFloatsSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/CompressedFloatsSupplierSerializer.java index da967898dba..89fa28d2c84 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedFloatsSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedFloatsSupplierSerializer.java @@ -36,27 +36,29 @@ import java.nio.FloatBuffer; public class CompressedFloatsSupplierSerializer { public static CompressedFloatsSupplierSerializer create( - IOPeon ioPeon, final String filenameBase, final ByteOrder order + IOPeon ioPeon, final String filenameBase, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression ) throws IOException { - return create(ioPeon, filenameBase, CompressedFloatsIndexedSupplier.MAX_FLOATS_IN_BUFFER, order); + return create(ioPeon, filenameBase, CompressedFloatsIndexedSupplier.MAX_FLOATS_IN_BUFFER, order, compression); } public static CompressedFloatsSupplierSerializer create( - IOPeon ioPeon, final String filenameBase, final int sizePer, final ByteOrder order + IOPeon ioPeon, final String filenameBase, final int sizePer, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression ) throws IOException { final CompressedFloatsSupplierSerializer retVal = new CompressedFloatsSupplierSerializer( sizePer, new GenericIndexedWriter>( - ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order) - ) + ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, sizePer) + ), + compression ); return retVal; } private final int sizePer; private final GenericIndexedWriter> flattener; + private final CompressedObjectStrategy.CompressionStrategy compression; private int numInserted = 0; @@ -64,11 +66,13 @@ public class CompressedFloatsSupplierSerializer public CompressedFloatsSupplierSerializer( int sizePer, - GenericIndexedWriter> flattener + GenericIndexedWriter> flattener, + CompressedObjectStrategy.CompressionStrategy compression ) { this.sizePer = sizePer; this.flattener = flattener; + this.compression = compression; endBuffer = FloatBuffer.allocate(sizePer); endBuffer.mark(); @@ -110,6 +114,7 @@ public class CompressedFloatsSupplierSerializer out.write(CompressedFloatsIndexedSupplier.version); out.write(Ints.toByteArray(numInserted)); out.write(Ints.toByteArray(sizePer)); + out.write(new byte[]{compression.getId()}); ByteStreams.copy(flattener.combineStreams(), out); } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java index 823c86a0516..b0b511278f1 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java @@ -28,14 +28,16 @@ import java.nio.LongBuffer; /** */ -public class CompressedLongBufferObjectStrategy extends CompressedObjectStrategy +public class CompressedLongBufferObjectStrategy extends FixedSizeCompressedObjectStrategy { - public static CompressedLongBufferObjectStrategy getBufferForOrder(ByteOrder order) + public static CompressedLongBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer) { - return new CompressedLongBufferObjectStrategy(order); + return new CompressedLongBufferObjectStrategy(order, compression, sizePer); } - private CompressedLongBufferObjectStrategy(final ByteOrder order) + private final int sizePer; + + private CompressedLongBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer) { super( order, @@ -64,8 +66,15 @@ public class CompressedLongBufferObjectStrategy extends CompressedObjectStrategy { return into.asLongBuffer().put(from); } - } + }, + compression ); + this.sizePer = sizePer; } + @Override + public int getSize() + { + return sizePer; + } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java index 8ad267168a5..56998d09886 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java @@ -40,21 +40,25 @@ import java.util.Iterator; */ public class CompressedLongsIndexedSupplier implements Supplier { - public static final byte version = 0x1; + public static final byte LZF_VERSION = 0x1; + public static final byte version = 0x2; private final int totalSize; private final int sizePer; private final GenericIndexed> baseLongBuffers; + private final CompressedObjectStrategy.CompressionStrategy compression; CompressedLongsIndexedSupplier( int totalSize, int sizePer, - GenericIndexed> baseLongBuffers + GenericIndexed> baseLongBuffers, + CompressedObjectStrategy.CompressionStrategy compression ) { this.totalSize = totalSize; this.sizePer = sizePer; this.baseLongBuffers = baseLongBuffers; + this.compression = compression; } public int size() @@ -162,7 +166,7 @@ public class CompressedLongsIndexedSupplier implements Supplier public long getSerializedSize() { - return baseLongBuffers.getSerializedSize() + 1 + 4 + 4; + return baseLongBuffers.getSerializedSize() + 1 + 4 + 4 + 1; } public void writeToChannel(WritableByteChannel channel) throws IOException @@ -170,15 +174,17 @@ public class CompressedLongsIndexedSupplier implements Supplier channel.write(ByteBuffer.wrap(new byte[]{version})); channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize))); channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer))); + channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); baseLongBuffers.writeToChannel(channel); } - public CompressedLongsIndexedSupplier convertByteOrder(ByteOrder order) + public CompressedLongsIndexedSupplier convertByteOrder(ByteOrder order, CompressedObjectStrategy.CompressionStrategy compression) { return new CompressedLongsIndexedSupplier( totalSize, sizePer, - GenericIndexed.fromIterable(baseLongBuffers, CompressedLongBufferObjectStrategy.getBufferForOrder(order)) + GenericIndexed.fromIterable(baseLongBuffers, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)), + compression ); } @@ -196,23 +202,37 @@ public class CompressedLongsIndexedSupplier implements Supplier byte versionFromBuffer = buffer.get(); if (versionFromBuffer == version) { + final int totalSize = buffer.getInt(); + final int sizePer = buffer.getInt(); + final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get()); return new CompressedLongsIndexedSupplier( - buffer.getInt(), - buffer.getInt(), - GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order)) + totalSize, + sizePer, + GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)), + compression + ); + } else if (versionFromBuffer == LZF_VERSION) { + final int totalSize = buffer.getInt(); + final int sizePer = buffer.getInt(); + final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.LZF; + return new CompressedLongsIndexedSupplier( + totalSize, + sizePer, + GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)), + compression ); } throw new IAE("Unknown version[%s]", versionFromBuffer); } - public static CompressedLongsIndexedSupplier fromLongBuffer(LongBuffer buffer, final ByteOrder byteOrder) + public static CompressedLongsIndexedSupplier fromLongBuffer(LongBuffer buffer, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression) { - return fromLongBuffer(buffer, 0xFFFF / Longs.BYTES, byteOrder); + return fromLongBuffer(buffer, 0xFFFF / Longs.BYTES, byteOrder, compression); } public static CompressedLongsIndexedSupplier fromLongBuffer( - final LongBuffer buffer, final int chunkFactor, final ByteOrder byteOrder + final LongBuffer buffer, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression ) { Preconditions.checkArgument( @@ -259,8 +279,9 @@ public class CompressedLongsIndexedSupplier implements Supplier }; } }, - CompressedLongBufferObjectStrategy.getBufferForOrder(byteOrder) - ) + CompressedLongBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor) + ), + compression ); } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java index b0e63a8e391..97a7545009a 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java @@ -37,20 +37,23 @@ import java.nio.LongBuffer; public class CompressedLongsSupplierSerializer { public static CompressedLongsSupplierSerializer create( - IOPeon ioPeon, final String filenameBase, final ByteOrder order + IOPeon ioPeon, final String filenameBase, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression ) throws IOException { + final int sizePer = 0xFFFF / Longs.BYTES; final CompressedLongsSupplierSerializer retVal = new CompressedLongsSupplierSerializer( - 0xFFFF / Longs.BYTES, + sizePer, new GenericIndexedWriter>( - ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order) - ) + ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer) + ), + compression ); return retVal; } private final int sizePer; private final GenericIndexedWriter> flattener; + private final CompressedObjectStrategy.CompressionStrategy compression; private int numInserted = 0; @@ -58,11 +61,13 @@ public class CompressedLongsSupplierSerializer public CompressedLongsSupplierSerializer( int sizePer, - GenericIndexedWriter> flattener + GenericIndexedWriter> flattener, + CompressedObjectStrategy.CompressionStrategy compression ) { this.sizePer = sizePer; this.flattener = flattener; + this.compression = compression; endBuffer = LongBuffer.allocate(sizePer); endBuffer.mark(); @@ -104,6 +109,7 @@ public class CompressedLongsSupplierSerializer out.write(CompressedLongsIndexedSupplier.version); out.write(Ints.toByteArray(numInserted)); out.write(Ints.toByteArray(sizePer)); + out.write(new byte[]{compression.getId()}); ByteStreams.copy(flattener.combineStreams(), out); } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java index 4a0b5723b41..e0865e8ec80 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java @@ -21,31 +21,216 @@ package io.druid.segment.data; import com.google.common.base.Throwables; import com.metamx.common.guava.CloseQuietly; +import com.google.common.collect.Maps; import com.ning.compress.lzf.ChunkEncoder; import com.ning.compress.lzf.LZFChunk; import com.ning.compress.lzf.LZFDecoder; import io.druid.collections.ResourceHolder; import io.druid.segment.CompressedPools; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import net.jpountz.lz4.LZ4SafeDecompressor; +import net.jpountz.lz4.LZ4UnknownSizeDecompressor; import java.io.IOException; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.Map; /** */ public class CompressedObjectStrategy implements ObjectStrategy> { - private final ByteOrder order; - private final BufferConverter converter; + public static enum CompressionStrategy { + LZF ((byte)0x0) + { + @Override + public Decompressor getDecompressor() + { + return new LZFDecompressor(); + } + + @Override + public Compressor getCompressor() + { + return new LZFCompressor(); + } + }, + + LZ4 ((byte)0x1) { + @Override + public Decompressor getDecompressor() + { + return new LZ4Decompressor(); + } + + @Override + public Compressor getCompressor() + { + return new LZ4Compressor(); + } + }; + + final byte id; + + CompressionStrategy(byte id) { + this.id = id; + } + + public byte getId() + { + return id; + } + public abstract Compressor getCompressor(); + public abstract Decompressor getDecompressor(); + + static final Map idMap = Maps.newHashMap(); + static { + for(CompressionStrategy strategy : CompressionStrategy.values()) idMap.put(strategy.getId(), strategy); + } + + public static CompressionStrategy forId(byte id) + { + return idMap.get(id); + } + } + + public static interface Decompressor + { + /** + * Implementations of this method are expected to call out.flip() after writing to the output buffer + * + * @param in + * @param numBytes + * @param out + */ + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out); + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize); + } + + public static interface Compressor + { + /** + * Currently assumes buf is an array backed ByteBuffer + * + * @param bytes + * @return + */ + public byte[] compress(byte[] bytes); + } + + public static class LZFDecompressor implements Decompressor + { + @Override + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) + { + final byte[] bytes = new byte[numBytes]; + in.get(bytes); + + try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { + final byte[] outputBytes = outputBytesHolder.get(); + final int numDecompressedBytes = LZFDecoder.decode(bytes, outputBytes); + out.put(outputBytes, 0, numDecompressedBytes); + out.flip(); + } + catch (IOException e) { + Throwables.propagate(e); + } + } + + @Override + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize) + { + decompress(in, numBytes, out); + } + } + + public static class LZFCompressor implements Compressor + { + @Override + public byte[] compress(byte[] bytes) + { + final ResourceHolder encoder = CompressedPools.getChunkEncoder(); + LZFChunk chunk = encoder.get().encodeChunk(bytes, 0, bytes.length); + CloseQuietly.close(encoder); + + return chunk.getData(); + } + } + + public static class LZ4Decompressor implements Decompressor + { + private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor(); + private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); + + @Override + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) + { + final byte[] bytes = new byte[numBytes]; + in.get(bytes); + + try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { + final byte[] outputBytes = outputBytesHolder.get(); + final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length); + + out.put(outputBytes, 0, numDecompressedBytes); + out.flip(); + } + catch (IOException e) { + Throwables.propagate(e); + } + } + + @Override + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize) + { + final byte[] bytes = new byte[numBytes]; + in.get(bytes); + + try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { + final byte[] outputBytes = outputBytesHolder.get(); + lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize); + + out.put(outputBytes, 0, decompressedSize); + out.flip(); + } + catch (IOException e) { + Throwables.propagate(e); + } + } + } + + public static class LZ4Compressor implements Compressor + { + private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor(); + + @Override + public byte[] compress(byte[] bytes) + { + final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)]; + final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length); + final byte[] out = new byte[outputBytes]; + System.arraycopy(intermediate, 0, out, 0, outputBytes); + return out; + } + } + + protected final ByteOrder order; + protected final BufferConverter converter; + protected final Decompressor decompressor; + private final Compressor compressor; protected CompressedObjectStrategy( final ByteOrder order, - final BufferConverter converter + final BufferConverter converter, + final CompressionStrategy compression ) { this.order = order; this.converter = converter; + this.decompressor = compression.getDecompressor(); + this.compressor = compression.getCompressor(); } @Override @@ -58,56 +243,49 @@ public class CompressedObjectStrategy implements ObjectStrateg @Override public ResourceHolder fromByteBuffer(ByteBuffer buffer, int numBytes) { - byte[] bytes = new byte[numBytes]; - buffer.get(bytes); - final ResourceHolder bufHolder = CompressedPools.getByteBuf(order); final ByteBuffer buf = bufHolder.get(); buf.position(0); buf.limit(buf.capacity()); - try { - final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes(); - - byte[] outputBytes = outputBytesHolder.get(); - int numDecompressedBytes = LZFDecoder.decode(bytes, outputBytes); - buf.put(outputBytes, 0, numDecompressedBytes); - buf.flip(); - - CloseQuietly.close(outputBytesHolder); - - return new ResourceHolder() + decompress(buffer, numBytes, buf); + return new ResourceHolder() + { + @Override + public T get() { - @Override - public T get() - { - return converter.convert(buf); - } + return converter.convert(buf); + } - @Override - public void close() throws IOException - { - bufHolder.close(); - } - }; - } - catch (IOException e) { - throw Throwables.propagate(e); - } + @Override + public void close() throws IOException + { + bufHolder.close(); + } + }; + } + + protected void decompress( + ByteBuffer buffer, + int numBytes, + ByteBuffer buf + ) + { + decompressor.decompress(buffer, numBytes, buf); } @Override public byte[] toBytes(ResourceHolder holder) { T val = holder.get(); - ByteBuffer buf = ByteBuffer.allocate(converter.sizeOf(val.remaining())).order(order); + ByteBuffer buf = bufferFor(val); converter.combine(buf, val); + return compressor.compress(buf.array()); + } - final ResourceHolder encoder = CompressedPools.getChunkEncoder(); - LZFChunk chunk = encoder.get().encodeChunk(buf.array(), 0, buf.array().length); - CloseQuietly.close(encoder); - - return chunk.getData(); + protected ByteBuffer bufferFor(T val) + { + return ByteBuffer.allocate(converter.sizeOf(val.remaining())).order(order); } @Override diff --git a/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java new file mode 100644 index 00000000000..c79e0edc615 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java @@ -0,0 +1,50 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.data; + +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public abstract class FixedSizeCompressedObjectStrategy extends CompressedObjectStrategy +{ + protected FixedSizeCompressedObjectStrategy( + ByteOrder order, + BufferConverter converter, + CompressionStrategy compression + ) + { + super(order, converter, compression); + } + + public abstract int getSize(); + + @Override + protected ByteBuffer bufferFor(T val) + { + return ByteBuffer.allocate(converter.sizeOf(getSize())).order(order); + } + + @Override + protected void decompress(ByteBuffer buffer, int numBytes, ByteBuffer buf) + { + decompressor.decompress(buffer, numBytes, buf, converter.sizeOf(getSize())); + } +} diff --git a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java index d056158fa7d..e8ae40efbab 100644 --- a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java +++ b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java @@ -37,6 +37,7 @@ import java.util.List; */ public class InMemoryCompressedFloats implements IndexedFloats { + public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.CompressionStrategy.LZ4; private final CompressedFloatBufferObjectStrategy strategy; private final int sizePer; @@ -56,7 +57,11 @@ public class InMemoryCompressedFloats implements IndexedFloats ) { this.sizePer = sizePer; - strategy = CompressedFloatBufferObjectStrategy.getBufferForOrder(order); + strategy = CompressedFloatBufferObjectStrategy.getBufferForOrder( + order, + COMPRESSION, + sizePer + ); endBuffer = FloatBuffer.allocate(sizePer); endBuffer.mark(); @@ -184,7 +189,8 @@ public class InMemoryCompressedFloats implements IndexedFloats Arrays.>asList(StupidResourceHolder.create(endBufCopy)) ), strategy - ) + ), + COMPRESSION ); } diff --git a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java index e0ef6fac375..9fd314569d5 100644 --- a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java +++ b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java @@ -38,6 +38,7 @@ import java.util.List; */ public class InMemoryCompressedLongs implements IndexedLongs { + public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.CompressionStrategy.LZ4; private final CompressedLongBufferObjectStrategy strategy; private final int sizePer; @@ -57,7 +58,11 @@ public class InMemoryCompressedLongs implements IndexedLongs ) { this.sizePer = sizePer; - strategy = CompressedLongBufferObjectStrategy.getBufferForOrder(order); + strategy = CompressedLongBufferObjectStrategy.getBufferForOrder( + order, + COMPRESSION, + sizePer + ); endBuffer = LongBuffer.allocate(sizePer); endBuffer.mark(); @@ -195,7 +200,8 @@ public class InMemoryCompressedLongs implements IndexedLongs Arrays.>asList(StupidResourceHolder.create(longBufCopy)) ), strategy - ) + ), + COMPRESSION ); } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java index d9cf4c5b6ab..e481ee8cef6 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java @@ -68,7 +68,8 @@ public class CompressedFloatsIndexedSupplierTest supplier = CompressedFloatsIndexedSupplier.fromFloatBuffer( FloatBuffer.wrap(vals), 5, - ByteOrder.nativeOrder() + ByteOrder.nativeOrder(), + CompressedObjectStrategy.CompressionStrategy.LZ4 ); indexed = supplier.get(); @@ -82,7 +83,7 @@ public class CompressedFloatsIndexedSupplierTest ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer( - FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder() + FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4 ); theSupplier.writeToChannel(Channels.newChannel(baos)); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java b/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java index 7b670110269..3b888ae132f 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java @@ -39,13 +39,19 @@ public class CompressedFloatsSupplierSerializerTest public void testSanity() throws Exception { final ByteOrder order = ByteOrder.nativeOrder(); + final int sizePer = 999; CompressedFloatsSupplierSerializer serializer = new CompressedFloatsSupplierSerializer( - 999, + sizePer, new GenericIndexedWriter>( new IOPeonForTesting(), "test", - CompressedFloatBufferObjectStrategy.getBufferForOrder(order) - ) + CompressedFloatBufferObjectStrategy.getBufferForOrder( + order, + CompressedObjectStrategy.CompressionStrategy.LZ4, + sizePer + ) + ), + CompressedObjectStrategy.CompressionStrategy.LZ4 ); serializer.open(); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java index 768100fd559..dd33d532498 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java @@ -66,7 +66,8 @@ public class CompressedLongsIndexedSupplierTest supplier = CompressedLongsIndexedSupplier.fromLongBuffer( LongBuffer.wrap(vals), 5, - ByteOrder.nativeOrder() + ByteOrder.nativeOrder(), + CompressedObjectStrategy.CompressionStrategy.LZ4 ); indexed = supplier.get(); @@ -78,7 +79,7 @@ public class CompressedLongsIndexedSupplierTest ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedLongsIndexedSupplier theSupplier = CompressedLongsIndexedSupplier.fromLongBuffer( - LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder() + LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4 ); theSupplier.writeToChannel(Channels.newChannel(baos)); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java index 029297e73e8..ee13c55087e 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java @@ -39,13 +39,15 @@ public class CompressedLongsSupplierSerializerTest public void testSanity() throws Exception { final ByteOrder order = ByteOrder.nativeOrder(); + final int sizePer = 999; CompressedLongsSupplierSerializer serializer = new CompressedLongsSupplierSerializer( - 999, + sizePer, new GenericIndexedWriter>( new IOPeonForTesting(), "test", - CompressedLongBufferObjectStrategy.getBufferForOrder(order) - ) + CompressedLongBufferObjectStrategy.getBufferForOrder(order, CompressedObjectStrategy.CompressionStrategy.LZ4, sizePer) + ), + CompressedObjectStrategy.CompressionStrategy.LZ4 ); serializer.open(); From 991e1828b05326eb2865929f3f0623108e79b0a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 6 Aug 2014 15:50:04 -0700 Subject: [PATCH 2/3] make LZ4 the default compression strategy - LZ4 is now hardwired to be the default strategy - Rework tests to test all available compression strategies --- .../segment/FloatMetricColumnSerializer.java | 2 +- .../java/io/druid/segment/IndexMerger.java | 2 +- .../data/CompressedObjectStrategy.java | 2 + .../data/InMemoryCompressedFloats.java | 2 +- .../segment/data/InMemoryCompressedLongs.java | 2 +- .../CompressedFloatsIndexedSupplierTest.java | 16 ++++-- ...ompressedFloatsSupplierSerializerTest.java | 16 ++++-- .../CompressedLongsIndexedSupplierTest.java | 11 ++-- ...CompressedLongsSupplierSerializerTest.java | 16 ++++-- .../segment/data/CompressionStrategyTest.java | 52 +++++++++++++++++++ 10 files changed, 99 insertions(+), 22 deletions(-) create mode 100644 processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java diff --git a/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java b/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java index 520708fe015..2bc50c19d30 100644 --- a/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java +++ b/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java @@ -53,7 +53,7 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer { writer = CompressedFloatsSupplierSerializer.create( ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER, - CompressedObjectStrategy.CompressionStrategy.LZ4 // TODO define this somewhere else + CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY ); writer.open(); diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 87cfa25a2ed..b817e3ea089 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -596,7 +596,7 @@ public class IndexMerger CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create( ioPeon, "little_end_time", IndexIO.BYTE_ORDER, - CompressedObjectStrategy.CompressionStrategy.LZ4 // TODO define this somewhere else + CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY ); timeWriter.open(); diff --git a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java index e0865e8ec80..36a3ad4876e 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java @@ -42,6 +42,8 @@ import java.util.Map; */ public class CompressedObjectStrategy implements ObjectStrategy> { + public static final CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = CompressionStrategy.LZ4; + public static enum CompressionStrategy { LZF ((byte)0x0) { diff --git a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java index e8ae40efbab..d86a0f4364e 100644 --- a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java +++ b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java @@ -37,7 +37,7 @@ import java.util.List; */ public class InMemoryCompressedFloats implements IndexedFloats { - public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.CompressionStrategy.LZ4; + public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY; private final CompressedFloatBufferObjectStrategy strategy; private final int sizePer; diff --git a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java index 9fd314569d5..266475636d3 100644 --- a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java +++ b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java @@ -38,7 +38,7 @@ import java.util.List; */ public class InMemoryCompressedLongs implements IndexedLongs { - public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.CompressionStrategy.LZ4; + public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY; private final CompressedLongBufferObjectStrategy strategy; private final int sizePer; diff --git a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java index e481ee8cef6..0982f76f8bf 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java @@ -25,6 +25,8 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -36,10 +38,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -/** - */ -public class CompressedFloatsIndexedSupplierTest +@RunWith(Parameterized.class) +public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest { + public CompressedFloatsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy) + { + super(compressionStrategy); + } + private IndexedFloats indexed; private CompressedFloatsIndexedSupplier supplier; private float[] vals; @@ -69,7 +75,7 @@ public class CompressedFloatsIndexedSupplierTest FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), - CompressedObjectStrategy.CompressionStrategy.LZ4 + compressionStrategy ); indexed = supplier.get(); @@ -83,7 +89,7 @@ public class CompressedFloatsIndexedSupplierTest ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer( - FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4 + FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), compressionStrategy ); theSupplier.writeToChannel(Channels.newChannel(baos)); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java b/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java index 3b888ae132f..e61c01be8e5 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java @@ -23,6 +23,8 @@ import com.google.common.io.OutputSupplier; import io.druid.collections.ResourceHolder; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -31,10 +33,14 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.FloatBuffer; -/** - */ -public class CompressedFloatsSupplierSerializerTest +@RunWith(Parameterized.class) +public class CompressedFloatsSupplierSerializerTest extends CompressionStrategyTest { + public CompressedFloatsSupplierSerializerTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy) + { + super(compressionStrategy); + } + @Test public void testSanity() throws Exception { @@ -47,11 +53,11 @@ public class CompressedFloatsSupplierSerializerTest "test", CompressedFloatBufferObjectStrategy.getBufferForOrder( order, - CompressedObjectStrategy.CompressionStrategy.LZ4, + compressionStrategy, sizePer ) ), - CompressedObjectStrategy.CompressionStrategy.LZ4 + compressionStrategy ); serializer.open(); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java index dd33d532498..fc29e284443 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java @@ -38,8 +38,13 @@ import java.util.concurrent.atomic.AtomicReference; /** */ -public class CompressedLongsIndexedSupplierTest +public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest { + public CompressedLongsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy) + { + super(compressionStrategy); + } + private IndexedLongs indexed; private CompressedLongsIndexedSupplier supplier; private long[] vals; @@ -67,7 +72,7 @@ public class CompressedLongsIndexedSupplierTest LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), - CompressedObjectStrategy.CompressionStrategy.LZ4 + compressionStrategy ); indexed = supplier.get(); @@ -79,7 +84,7 @@ public class CompressedLongsIndexedSupplierTest ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedLongsIndexedSupplier theSupplier = CompressedLongsIndexedSupplier.fromLongBuffer( - LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4 + LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), compressionStrategy ); theSupplier.writeToChannel(Channels.newChannel(baos)); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java index ee13c55087e..bb5d6ec444e 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java @@ -23,6 +23,8 @@ import com.google.common.io.OutputSupplier; import io.druid.collections.ResourceHolder; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -31,10 +33,14 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.LongBuffer; -/** - */ -public class CompressedLongsSupplierSerializerTest +@RunWith(Parameterized.class) +public class CompressedLongsSupplierSerializerTest extends CompressionStrategyTest { + public CompressedLongsSupplierSerializerTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy) + { + super(compressionStrategy); + } + @Test public void testSanity() throws Exception { @@ -45,9 +51,9 @@ public class CompressedLongsSupplierSerializerTest new GenericIndexedWriter>( new IOPeonForTesting(), "test", - CompressedLongBufferObjectStrategy.getBufferForOrder(order, CompressedObjectStrategy.CompressionStrategy.LZ4, sizePer) + CompressedLongBufferObjectStrategy.getBufferForOrder(order, compressionStrategy, sizePer) ), - CompressedObjectStrategy.CompressionStrategy.LZ4 + compressionStrategy ); serializer.open(); diff --git a/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java b/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java new file mode 100644 index 00000000000..845ed8369c7 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java @@ -0,0 +1,52 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.data; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.junit.runners.Parameterized; + +import java.util.Arrays; + +public class CompressionStrategyTest +{ + @Parameterized.Parameters + public static Iterable compressionStrategies() + { + return Iterables.transform( + Arrays.asList(CompressedObjectStrategy.CompressionStrategy.values()), + new Function() + { + @Override + public Object[] apply(CompressedObjectStrategy.CompressionStrategy compressionStrategy) + { + return new Object[]{compressionStrategy}; + } + } + ); + } + + protected final CompressedObjectStrategy.CompressionStrategy compressionStrategy; + + public CompressionStrategyTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy) + { + this.compressionStrategy = compressionStrategy; + } +} From 6fa611c2620d285934b5abf6feda832ccc1a3543 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 6 Aug 2014 16:13:39 -0700 Subject: [PATCH 3/3] refactor FixedSizeCompressedObjectStrategy --- .../data/CompressedFloatBufferObjectStrategy.java | 12 ++---------- .../data/CompressedLongBufferObjectStrategy.java | 12 ++---------- .../data/FixedSizeCompressedObjectStrategy.java | 10 ++++++++-- 3 files changed, 12 insertions(+), 22 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java index e6294462afe..b0680b2ec23 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java @@ -35,8 +35,6 @@ public class CompressedFloatBufferObjectStrategy extends FixedSizeCompressedObje return new CompressedFloatBufferObjectStrategy(order, compression, sizePer); } - private final int sizePer; - private CompressedFloatBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer) { super( @@ -67,14 +65,8 @@ public class CompressedFloatBufferObjectStrategy extends FixedSizeCompressedObje return into.asFloatBuffer().put(from); } }, - compression + compression, + sizePer ); - this.sizePer = sizePer; - } - - @Override - public int getSize() - { - return sizePer; } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java index b0b511278f1..13fd264eba3 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java @@ -35,8 +35,6 @@ public class CompressedLongBufferObjectStrategy extends FixedSizeCompressedObjec return new CompressedLongBufferObjectStrategy(order, compression, sizePer); } - private final int sizePer; - private CompressedLongBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer) { super( @@ -67,14 +65,8 @@ public class CompressedLongBufferObjectStrategy extends FixedSizeCompressedObjec return into.asLongBuffer().put(from); } }, - compression + compression, + sizePer ); - this.sizePer = sizePer; - } - - @Override - public int getSize() - { - return sizePer; } } diff --git a/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java index c79e0edc615..3efc1ba06ac 100644 --- a/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java @@ -25,16 +25,22 @@ import java.nio.ByteOrder; public abstract class FixedSizeCompressedObjectStrategy extends CompressedObjectStrategy { + private final int sizePer; + protected FixedSizeCompressedObjectStrategy( ByteOrder order, BufferConverter converter, - CompressionStrategy compression + CompressionStrategy compression, + int sizePer ) { super(order, converter, compression); + this.sizePer = sizePer; } - public abstract int getSize(); + public int getSize() { + return sizePer; + } @Override protected ByteBuffer bufferFor(T val)