diff --git a/pom.xml b/pom.xml
index 31bd84a4f63..6e8905433f8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -384,7 +384,7 @@
net.jpountz.lz4
lz4
- 1.1.2
+ 1.2.0
com.google.protobuf
diff --git a/processing/pom.xml b/processing/pom.xml
index a92d70564ee..312b9b408cd 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -82,6 +82,10 @@
com.davekoelle
alphanum
+
+ net.jpountz.lz4
+ lz4
+
diff --git a/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java b/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java
index 4caa6b95fe2..2bc50c19d30 100644
--- a/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java
+++ b/processing/src/main/java/io/druid/segment/FloatMetricColumnSerializer.java
@@ -21,6 +21,7 @@ package io.druid.segment;
import com.google.common.io.Files;
import io.druid.segment.data.CompressedFloatsSupplierSerializer;
+import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.IOPeon;
import java.io.File;
@@ -51,7 +52,8 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer
public void open() throws IOException
{
writer = CompressedFloatsSupplierSerializer.create(
- ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER
+ ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER,
+ CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
);
writer.open();
diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java
index 99f854da9af..b817e3ea089 100644
--- a/processing/src/main/java/io/druid/segment/IndexMerger.java
+++ b/processing/src/main/java/io/druid/segment/IndexMerger.java
@@ -54,6 +54,7 @@ import io.druid.query.aggregation.ToLowerCaseAggregatorFactory;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.data.ByteBufferWriter;
import io.druid.segment.data.CompressedLongsSupplierSerializer;
+import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseCompressedIndexedInts;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.GenericIndexedWriter;
@@ -594,7 +595,8 @@ public class IndexMerger
Iterable theRows = rowMergerFn.apply(boats);
CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create(
- ioPeon, "little_end_time", IndexIO.BYTE_ORDER
+ ioPeon, "little_end_time", IndexIO.BYTE_ORDER,
+ CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
);
timeWriter.open();
diff --git a/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java
index 7a9906cd364..b0680b2ec23 100644
--- a/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java
+++ b/processing/src/main/java/io/druid/segment/data/CompressedFloatBufferObjectStrategy.java
@@ -28,14 +28,14 @@ import java.nio.FloatBuffer;
/**
*/
-public class CompressedFloatBufferObjectStrategy extends CompressedObjectStrategy
+public class CompressedFloatBufferObjectStrategy extends FixedSizeCompressedObjectStrategy
{
- public static CompressedFloatBufferObjectStrategy getBufferForOrder(ByteOrder order)
+ public static CompressedFloatBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{
- return new CompressedFloatBufferObjectStrategy(order);
+ return new CompressedFloatBufferObjectStrategy(order, compression, sizePer);
}
- private CompressedFloatBufferObjectStrategy(final ByteOrder order)
+ private CompressedFloatBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{
super(
order,
@@ -64,7 +64,9 @@ public class CompressedFloatBufferObjectStrategy extends CompressedObjectStrateg
{
return into.asFloatBuffer().put(from);
}
- }
+ },
+ compression,
+ sizePer
);
}
}
diff --git a/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java
index 1beccc5426f..35a3b03f0b2 100644
--- a/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java
+++ b/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java
@@ -40,22 +40,26 @@ import java.util.Iterator;
*/
public class CompressedFloatsIndexedSupplier implements Supplier
{
- public static final byte version = 0x1;
+ public static final byte LZF_VERSION = 0x1;
+ public static final byte version = 0x2;
public static final int MAX_FLOATS_IN_BUFFER = (0xFFFF >> 2);
private final int totalSize;
private final int sizePer;
private final GenericIndexed> baseFloatBuffers;
+ private final CompressedObjectStrategy.CompressionStrategy compression;
CompressedFloatsIndexedSupplier(
int totalSize,
int sizePer,
- GenericIndexed> baseFloatBuffers
+ GenericIndexed> baseFloatBuffers,
+ CompressedObjectStrategy.CompressionStrategy compression
)
{
this.totalSize = totalSize;
this.sizePer = sizePer;
this.baseFloatBuffers = baseFloatBuffers;
+ this.compression = compression;
}
public int size()
@@ -151,7 +155,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier
public long getSerializedSize()
{
- return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4;
+ return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4 + 1;
}
public void writeToChannel(WritableByteChannel channel) throws IOException
@@ -159,6 +163,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier
channel.write(ByteBuffer.wrap(new byte[]{version}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer)));
+ channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
baseFloatBuffers.writeToChannel(channel);
}
@@ -167,7 +172,8 @@ public class CompressedFloatsIndexedSupplier implements Supplier
return new CompressedFloatsIndexedSupplier(
totalSize,
sizePer,
- GenericIndexed.fromIterable(baseFloatBuffers, CompressedFloatBufferObjectStrategy.getBufferForOrder(order))
+ GenericIndexed.fromIterable(baseFloatBuffers, CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
+ compression
);
}
@@ -191,23 +197,53 @@ public class CompressedFloatsIndexedSupplier implements Supplier
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == version) {
+ final int totalSize = buffer.getInt();
+ final int sizePer = buffer.getInt();
+ final CompressedObjectStrategy.CompressionStrategy compression =
+ CompressedObjectStrategy.CompressionStrategy.forId(buffer.get());
+
return new CompressedFloatsIndexedSupplier(
- buffer.getInt(),
- buffer.getInt(),
- GenericIndexed.read(buffer, CompressedFloatBufferObjectStrategy.getBufferForOrder(order))
+ totalSize,
+ sizePer,
+ GenericIndexed.read(
+ buffer,
+ CompressedFloatBufferObjectStrategy.getBufferForOrder(
+ order,
+ compression,
+ sizePer
+ )
+ ),
+ compression
+ );
+ } else if (versionFromBuffer == LZF_VERSION) {
+ final int totalSize = buffer.getInt();
+ final int sizePer = buffer.getInt();
+ final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.LZF;
+ return new CompressedFloatsIndexedSupplier(
+ totalSize,
+ sizePer,
+ GenericIndexed.read(
+ buffer,
+ CompressedFloatBufferObjectStrategy.getBufferForOrder(
+ order,
+ compression,
+ sizePer
+ )
+ ),
+ compression
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
- public static CompressedFloatsIndexedSupplier fromFloatBuffer(FloatBuffer buffer, final ByteOrder order)
+ public static CompressedFloatsIndexedSupplier fromFloatBuffer(FloatBuffer buffer, final ByteOrder order, CompressedObjectStrategy.CompressionStrategy compression)
{
- return fromFloatBuffer(buffer, MAX_FLOATS_IN_BUFFER, order);
+ return fromFloatBuffer(buffer, MAX_FLOATS_IN_BUFFER, order, compression);
}
public static CompressedFloatsIndexedSupplier fromFloatBuffer(
- final FloatBuffer buffer, final int chunkFactor, final ByteOrder order
+ final FloatBuffer buffer, final int chunkFactor, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
)
{
Preconditions.checkArgument(
@@ -254,8 +290,9 @@ public class CompressedFloatsIndexedSupplier implements Supplier
};
}
},
- CompressedFloatBufferObjectStrategy.getBufferForOrder(order)
- )
+ CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, chunkFactor)
+ ),
+ compression
);
}
diff --git a/processing/src/main/java/io/druid/segment/data/CompressedFloatsSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/CompressedFloatsSupplierSerializer.java
index da967898dba..89fa28d2c84 100644
--- a/processing/src/main/java/io/druid/segment/data/CompressedFloatsSupplierSerializer.java
+++ b/processing/src/main/java/io/druid/segment/data/CompressedFloatsSupplierSerializer.java
@@ -36,27 +36,29 @@ import java.nio.FloatBuffer;
public class CompressedFloatsSupplierSerializer
{
public static CompressedFloatsSupplierSerializer create(
- IOPeon ioPeon, final String filenameBase, final ByteOrder order
+ IOPeon ioPeon, final String filenameBase, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) throws IOException
{
- return create(ioPeon, filenameBase, CompressedFloatsIndexedSupplier.MAX_FLOATS_IN_BUFFER, order);
+ return create(ioPeon, filenameBase, CompressedFloatsIndexedSupplier.MAX_FLOATS_IN_BUFFER, order, compression);
}
public static CompressedFloatsSupplierSerializer create(
- IOPeon ioPeon, final String filenameBase, final int sizePer, final ByteOrder order
+ IOPeon ioPeon, final String filenameBase, final int sizePer, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) throws IOException
{
final CompressedFloatsSupplierSerializer retVal = new CompressedFloatsSupplierSerializer(
sizePer,
new GenericIndexedWriter>(
- ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order)
- )
+ ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)
+ ),
+ compression
);
return retVal;
}
private final int sizePer;
private final GenericIndexedWriter> flattener;
+ private final CompressedObjectStrategy.CompressionStrategy compression;
private int numInserted = 0;
@@ -64,11 +66,13 @@ public class CompressedFloatsSupplierSerializer
public CompressedFloatsSupplierSerializer(
int sizePer,
- GenericIndexedWriter> flattener
+ GenericIndexedWriter> flattener,
+ CompressedObjectStrategy.CompressionStrategy compression
)
{
this.sizePer = sizePer;
this.flattener = flattener;
+ this.compression = compression;
endBuffer = FloatBuffer.allocate(sizePer);
endBuffer.mark();
@@ -110,6 +114,7 @@ public class CompressedFloatsSupplierSerializer
out.write(CompressedFloatsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer));
+ out.write(new byte[]{compression.getId()});
ByteStreams.copy(flattener.combineStreams(), out);
}
}
diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java
index 823c86a0516..13fd264eba3 100644
--- a/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java
+++ b/processing/src/main/java/io/druid/segment/data/CompressedLongBufferObjectStrategy.java
@@ -28,14 +28,14 @@ import java.nio.LongBuffer;
/**
*/
-public class CompressedLongBufferObjectStrategy extends CompressedObjectStrategy
+public class CompressedLongBufferObjectStrategy extends FixedSizeCompressedObjectStrategy
{
- public static CompressedLongBufferObjectStrategy getBufferForOrder(ByteOrder order)
+ public static CompressedLongBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{
- return new CompressedLongBufferObjectStrategy(order);
+ return new CompressedLongBufferObjectStrategy(order, compression, sizePer);
}
- private CompressedLongBufferObjectStrategy(final ByteOrder order)
+ private CompressedLongBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{
super(
order,
@@ -64,8 +64,9 @@ public class CompressedLongBufferObjectStrategy extends CompressedObjectStrategy
{
return into.asLongBuffer().put(from);
}
- }
+ },
+ compression,
+ sizePer
);
}
-
}
diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java
index 8ad267168a5..56998d09886 100644
--- a/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java
+++ b/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java
@@ -40,21 +40,25 @@ import java.util.Iterator;
*/
public class CompressedLongsIndexedSupplier implements Supplier
{
- public static final byte version = 0x1;
+ public static final byte LZF_VERSION = 0x1;
+ public static final byte version = 0x2;
private final int totalSize;
private final int sizePer;
private final GenericIndexed> baseLongBuffers;
+ private final CompressedObjectStrategy.CompressionStrategy compression;
CompressedLongsIndexedSupplier(
int totalSize,
int sizePer,
- GenericIndexed> baseLongBuffers
+ GenericIndexed> baseLongBuffers,
+ CompressedObjectStrategy.CompressionStrategy compression
)
{
this.totalSize = totalSize;
this.sizePer = sizePer;
this.baseLongBuffers = baseLongBuffers;
+ this.compression = compression;
}
public int size()
@@ -162,7 +166,7 @@ public class CompressedLongsIndexedSupplier implements Supplier
public long getSerializedSize()
{
- return baseLongBuffers.getSerializedSize() + 1 + 4 + 4;
+ return baseLongBuffers.getSerializedSize() + 1 + 4 + 4 + 1;
}
public void writeToChannel(WritableByteChannel channel) throws IOException
@@ -170,15 +174,17 @@ public class CompressedLongsIndexedSupplier implements Supplier
channel.write(ByteBuffer.wrap(new byte[]{version}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer)));
+ channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
baseLongBuffers.writeToChannel(channel);
}
- public CompressedLongsIndexedSupplier convertByteOrder(ByteOrder order)
+ public CompressedLongsIndexedSupplier convertByteOrder(ByteOrder order, CompressedObjectStrategy.CompressionStrategy compression)
{
return new CompressedLongsIndexedSupplier(
totalSize,
sizePer,
- GenericIndexed.fromIterable(baseLongBuffers, CompressedLongBufferObjectStrategy.getBufferForOrder(order))
+ GenericIndexed.fromIterable(baseLongBuffers, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
+ compression
);
}
@@ -196,23 +202,37 @@ public class CompressedLongsIndexedSupplier implements Supplier
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == version) {
+ final int totalSize = buffer.getInt();
+ final int sizePer = buffer.getInt();
+ final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get());
return new CompressedLongsIndexedSupplier(
- buffer.getInt(),
- buffer.getInt(),
- GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order))
+ totalSize,
+ sizePer,
+ GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
+ compression
+ );
+ } else if (versionFromBuffer == LZF_VERSION) {
+ final int totalSize = buffer.getInt();
+ final int sizePer = buffer.getInt();
+ final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.LZF;
+ return new CompressedLongsIndexedSupplier(
+ totalSize,
+ sizePer,
+ GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
+ compression
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
- public static CompressedLongsIndexedSupplier fromLongBuffer(LongBuffer buffer, final ByteOrder byteOrder)
+ public static CompressedLongsIndexedSupplier fromLongBuffer(LongBuffer buffer, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression)
{
- return fromLongBuffer(buffer, 0xFFFF / Longs.BYTES, byteOrder);
+ return fromLongBuffer(buffer, 0xFFFF / Longs.BYTES, byteOrder, compression);
}
public static CompressedLongsIndexedSupplier fromLongBuffer(
- final LongBuffer buffer, final int chunkFactor, final ByteOrder byteOrder
+ final LongBuffer buffer, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression
)
{
Preconditions.checkArgument(
@@ -259,8 +279,9 @@ public class CompressedLongsIndexedSupplier implements Supplier
};
}
},
- CompressedLongBufferObjectStrategy.getBufferForOrder(byteOrder)
- )
+ CompressedLongBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor)
+ ),
+ compression
);
}
diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java
index b0e63a8e391..97a7545009a 100644
--- a/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java
+++ b/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java
@@ -37,20 +37,23 @@ import java.nio.LongBuffer;
public class CompressedLongsSupplierSerializer
{
public static CompressedLongsSupplierSerializer create(
- IOPeon ioPeon, final String filenameBase, final ByteOrder order
+ IOPeon ioPeon, final String filenameBase, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) throws IOException
{
+ final int sizePer = 0xFFFF / Longs.BYTES;
final CompressedLongsSupplierSerializer retVal = new CompressedLongsSupplierSerializer(
- 0xFFFF / Longs.BYTES,
+ sizePer,
new GenericIndexedWriter>(
- ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order)
- )
+ ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)
+ ),
+ compression
);
return retVal;
}
private final int sizePer;
private final GenericIndexedWriter> flattener;
+ private final CompressedObjectStrategy.CompressionStrategy compression;
private int numInserted = 0;
@@ -58,11 +61,13 @@ public class CompressedLongsSupplierSerializer
public CompressedLongsSupplierSerializer(
int sizePer,
- GenericIndexedWriter> flattener
+ GenericIndexedWriter> flattener,
+ CompressedObjectStrategy.CompressionStrategy compression
)
{
this.sizePer = sizePer;
this.flattener = flattener;
+ this.compression = compression;
endBuffer = LongBuffer.allocate(sizePer);
endBuffer.mark();
@@ -104,6 +109,7 @@ public class CompressedLongsSupplierSerializer
out.write(CompressedLongsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer));
+ out.write(new byte[]{compression.getId()});
ByteStreams.copy(flattener.combineStreams(), out);
}
}
diff --git a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java
index 4a0b5723b41..36a3ad4876e 100644
--- a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java
+++ b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java
@@ -21,31 +21,218 @@ package io.druid.segment.data;
import com.google.common.base.Throwables;
import com.metamx.common.guava.CloseQuietly;
+import com.google.common.collect.Maps;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.LZFDecoder;
import io.druid.collections.ResourceHolder;
import io.druid.segment.CompressedPools;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import net.jpountz.lz4.LZ4UnknownSizeDecompressor;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.Map;
/**
*/
public class CompressedObjectStrategy implements ObjectStrategy>
{
- private final ByteOrder order;
- private final BufferConverter converter;
+ public static final CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = CompressionStrategy.LZ4;
+
+ public static enum CompressionStrategy {
+ LZF ((byte)0x0)
+ {
+ @Override
+ public Decompressor getDecompressor()
+ {
+ return new LZFDecompressor();
+ }
+
+ @Override
+ public Compressor getCompressor()
+ {
+ return new LZFCompressor();
+ }
+ },
+
+ LZ4 ((byte)0x1) {
+ @Override
+ public Decompressor getDecompressor()
+ {
+ return new LZ4Decompressor();
+ }
+
+ @Override
+ public Compressor getCompressor()
+ {
+ return new LZ4Compressor();
+ }
+ };
+
+ final byte id;
+
+ CompressionStrategy(byte id) {
+ this.id = id;
+ }
+
+ public byte getId()
+ {
+ return id;
+ }
+ public abstract Compressor getCompressor();
+ public abstract Decompressor getDecompressor();
+
+ static final Map idMap = Maps.newHashMap();
+ static {
+ for(CompressionStrategy strategy : CompressionStrategy.values()) idMap.put(strategy.getId(), strategy);
+ }
+
+ public static CompressionStrategy forId(byte id)
+ {
+ return idMap.get(id);
+ }
+ }
+
+ public static interface Decompressor
+ {
+ /**
+ * Implementations of this method are expected to call out.flip() after writing to the output buffer
+ *
+ * @param in
+ * @param numBytes
+ * @param out
+ */
+ public void decompress(ByteBuffer in, int numBytes, ByteBuffer out);
+ public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize);
+ }
+
+ public static interface Compressor
+ {
+ /**
+ * Currently assumes buf is an array backed ByteBuffer
+ *
+ * @param bytes
+ * @return
+ */
+ public byte[] compress(byte[] bytes);
+ }
+
+ public static class LZFDecompressor implements Decompressor
+ {
+ @Override
+ public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
+ {
+ final byte[] bytes = new byte[numBytes];
+ in.get(bytes);
+
+ try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) {
+ final byte[] outputBytes = outputBytesHolder.get();
+ final int numDecompressedBytes = LZFDecoder.decode(bytes, outputBytes);
+ out.put(outputBytes, 0, numDecompressedBytes);
+ out.flip();
+ }
+ catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize)
+ {
+ decompress(in, numBytes, out);
+ }
+ }
+
+ public static class LZFCompressor implements Compressor
+ {
+ @Override
+ public byte[] compress(byte[] bytes)
+ {
+ final ResourceHolder encoder = CompressedPools.getChunkEncoder();
+ LZFChunk chunk = encoder.get().encodeChunk(bytes, 0, bytes.length);
+ CloseQuietly.close(encoder);
+
+ return chunk.getData();
+ }
+ }
+
+ public static class LZ4Decompressor implements Decompressor
+ {
+ private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor();
+ private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
+
+ @Override
+ public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
+ {
+ final byte[] bytes = new byte[numBytes];
+ in.get(bytes);
+
+ try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) {
+ final byte[] outputBytes = outputBytesHolder.get();
+ final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length);
+
+ out.put(outputBytes, 0, numDecompressedBytes);
+ out.flip();
+ }
+ catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize)
+ {
+ final byte[] bytes = new byte[numBytes];
+ in.get(bytes);
+
+ try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) {
+ final byte[] outputBytes = outputBytesHolder.get();
+ lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize);
+
+ out.put(outputBytes, 0, decompressedSize);
+ out.flip();
+ }
+ catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ }
+ }
+
+ public static class LZ4Compressor implements Compressor
+ {
+ private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor();
+
+ @Override
+ public byte[] compress(byte[] bytes)
+ {
+ final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)];
+ final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length);
+ final byte[] out = new byte[outputBytes];
+ System.arraycopy(intermediate, 0, out, 0, outputBytes);
+ return out;
+ }
+ }
+
+ protected final ByteOrder order;
+ protected final BufferConverter converter;
+ protected final Decompressor decompressor;
+ private final Compressor compressor;
protected CompressedObjectStrategy(
final ByteOrder order,
- final BufferConverter converter
+ final BufferConverter converter,
+ final CompressionStrategy compression
)
{
this.order = order;
this.converter = converter;
+ this.decompressor = compression.getDecompressor();
+ this.compressor = compression.getCompressor();
}
@Override
@@ -58,56 +245,49 @@ public class CompressedObjectStrategy implements ObjectStrateg
@Override
public ResourceHolder fromByteBuffer(ByteBuffer buffer, int numBytes)
{
- byte[] bytes = new byte[numBytes];
- buffer.get(bytes);
-
final ResourceHolder bufHolder = CompressedPools.getByteBuf(order);
final ByteBuffer buf = bufHolder.get();
buf.position(0);
buf.limit(buf.capacity());
- try {
- final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes();
-
- byte[] outputBytes = outputBytesHolder.get();
- int numDecompressedBytes = LZFDecoder.decode(bytes, outputBytes);
- buf.put(outputBytes, 0, numDecompressedBytes);
- buf.flip();
-
- CloseQuietly.close(outputBytesHolder);
-
- return new ResourceHolder()
+ decompress(buffer, numBytes, buf);
+ return new ResourceHolder()
+ {
+ @Override
+ public T get()
{
- @Override
- public T get()
- {
- return converter.convert(buf);
- }
+ return converter.convert(buf);
+ }
- @Override
- public void close() throws IOException
- {
- bufHolder.close();
- }
- };
- }
- catch (IOException e) {
- throw Throwables.propagate(e);
- }
+ @Override
+ public void close() throws IOException
+ {
+ bufHolder.close();
+ }
+ };
+ }
+
+ protected void decompress(
+ ByteBuffer buffer,
+ int numBytes,
+ ByteBuffer buf
+ )
+ {
+ decompressor.decompress(buffer, numBytes, buf);
}
@Override
public byte[] toBytes(ResourceHolder holder)
{
T val = holder.get();
- ByteBuffer buf = ByteBuffer.allocate(converter.sizeOf(val.remaining())).order(order);
+ ByteBuffer buf = bufferFor(val);
converter.combine(buf, val);
+ return compressor.compress(buf.array());
+ }
- final ResourceHolder encoder = CompressedPools.getChunkEncoder();
- LZFChunk chunk = encoder.get().encodeChunk(buf.array(), 0, buf.array().length);
- CloseQuietly.close(encoder);
-
- return chunk.getData();
+ protected ByteBuffer bufferFor(T val)
+ {
+ return ByteBuffer.allocate(converter.sizeOf(val.remaining())).order(order);
}
@Override
diff --git a/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java
new file mode 100644
index 00000000000..3efc1ba06ac
--- /dev/null
+++ b/processing/src/main/java/io/druid/segment/data/FixedSizeCompressedObjectStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.segment.data;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public abstract class FixedSizeCompressedObjectStrategy extends CompressedObjectStrategy
+{
+ private final int sizePer;
+
+ protected FixedSizeCompressedObjectStrategy(
+ ByteOrder order,
+ BufferConverter converter,
+ CompressionStrategy compression,
+ int sizePer
+ )
+ {
+ super(order, converter, compression);
+ this.sizePer = sizePer;
+ }
+
+ public int getSize() {
+ return sizePer;
+ }
+
+ @Override
+ protected ByteBuffer bufferFor(T val)
+ {
+ return ByteBuffer.allocate(converter.sizeOf(getSize())).order(order);
+ }
+
+ @Override
+ protected void decompress(ByteBuffer buffer, int numBytes, ByteBuffer buf)
+ {
+ decompressor.decompress(buffer, numBytes, buf, converter.sizeOf(getSize()));
+ }
+}
diff --git a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java
index d056158fa7d..d86a0f4364e 100644
--- a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java
+++ b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedFloats.java
@@ -37,6 +37,7 @@ import java.util.List;
*/
public class InMemoryCompressedFloats implements IndexedFloats
{
+ public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY;
private final CompressedFloatBufferObjectStrategy strategy;
private final int sizePer;
@@ -56,7 +57,11 @@ public class InMemoryCompressedFloats implements IndexedFloats
)
{
this.sizePer = sizePer;
- strategy = CompressedFloatBufferObjectStrategy.getBufferForOrder(order);
+ strategy = CompressedFloatBufferObjectStrategy.getBufferForOrder(
+ order,
+ COMPRESSION,
+ sizePer
+ );
endBuffer = FloatBuffer.allocate(sizePer);
endBuffer.mark();
@@ -184,7 +189,8 @@ public class InMemoryCompressedFloats implements IndexedFloats
Arrays.>asList(StupidResourceHolder.create(endBufCopy))
),
strategy
- )
+ ),
+ COMPRESSION
);
}
diff --git a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java
index e0ef6fac375..266475636d3 100644
--- a/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java
+++ b/processing/src/main/java/io/druid/segment/data/InMemoryCompressedLongs.java
@@ -38,6 +38,7 @@ import java.util.List;
*/
public class InMemoryCompressedLongs implements IndexedLongs
{
+ public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY;
private final CompressedLongBufferObjectStrategy strategy;
private final int sizePer;
@@ -57,7 +58,11 @@ public class InMemoryCompressedLongs implements IndexedLongs
)
{
this.sizePer = sizePer;
- strategy = CompressedLongBufferObjectStrategy.getBufferForOrder(order);
+ strategy = CompressedLongBufferObjectStrategy.getBufferForOrder(
+ order,
+ COMPRESSION,
+ sizePer
+ );
endBuffer = LongBuffer.allocate(sizePer);
endBuffer.mark();
@@ -195,7 +200,8 @@ public class InMemoryCompressedLongs implements IndexedLongs
Arrays.>asList(StupidResourceHolder.create(longBufCopy))
),
strategy
- )
+ ),
+ COMPRESSION
);
}
diff --git a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java
index d9cf4c5b6ab..0982f76f8bf 100644
--- a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java
+++ b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java
@@ -25,6 +25,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -36,10 +38,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-/**
- */
-public class CompressedFloatsIndexedSupplierTest
+@RunWith(Parameterized.class)
+public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest
{
+ public CompressedFloatsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
+ {
+ super(compressionStrategy);
+ }
+
private IndexedFloats indexed;
private CompressedFloatsIndexedSupplier supplier;
private float[] vals;
@@ -68,7 +74,8 @@ public class CompressedFloatsIndexedSupplierTest
supplier = CompressedFloatsIndexedSupplier.fromFloatBuffer(
FloatBuffer.wrap(vals),
5,
- ByteOrder.nativeOrder()
+ ByteOrder.nativeOrder(),
+ compressionStrategy
);
indexed = supplier.get();
@@ -82,7 +89,7 @@ public class CompressedFloatsIndexedSupplierTest
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer(
- FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder()
+ FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), compressionStrategy
);
theSupplier.writeToChannel(Channels.newChannel(baos));
diff --git a/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java b/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java
index 7b670110269..e61c01be8e5 100644
--- a/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java
+++ b/processing/src/test/java/io/druid/segment/data/CompressedFloatsSupplierSerializerTest.java
@@ -23,6 +23,8 @@ import com.google.common.io.OutputSupplier;
import io.druid.collections.ResourceHolder;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -31,21 +33,31 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.FloatBuffer;
-/**
- */
-public class CompressedFloatsSupplierSerializerTest
+@RunWith(Parameterized.class)
+public class CompressedFloatsSupplierSerializerTest extends CompressionStrategyTest
{
+ public CompressedFloatsSupplierSerializerTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
+ {
+ super(compressionStrategy);
+ }
+
@Test
public void testSanity() throws Exception
{
final ByteOrder order = ByteOrder.nativeOrder();
+ final int sizePer = 999;
CompressedFloatsSupplierSerializer serializer = new CompressedFloatsSupplierSerializer(
- 999,
+ sizePer,
new GenericIndexedWriter>(
new IOPeonForTesting(),
"test",
- CompressedFloatBufferObjectStrategy.getBufferForOrder(order)
- )
+ CompressedFloatBufferObjectStrategy.getBufferForOrder(
+ order,
+ compressionStrategy,
+ sizePer
+ )
+ ),
+ compressionStrategy
);
serializer.open();
diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java
index 768100fd559..fc29e284443 100644
--- a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java
+++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java
@@ -38,8 +38,13 @@ import java.util.concurrent.atomic.AtomicReference;
/**
*/
-public class CompressedLongsIndexedSupplierTest
+public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
{
+ public CompressedLongsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
+ {
+ super(compressionStrategy);
+ }
+
private IndexedLongs indexed;
private CompressedLongsIndexedSupplier supplier;
private long[] vals;
@@ -66,7 +71,8 @@ public class CompressedLongsIndexedSupplierTest
supplier = CompressedLongsIndexedSupplier.fromLongBuffer(
LongBuffer.wrap(vals),
5,
- ByteOrder.nativeOrder()
+ ByteOrder.nativeOrder(),
+ compressionStrategy
);
indexed = supplier.get();
@@ -78,7 +84,7 @@ public class CompressedLongsIndexedSupplierTest
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final CompressedLongsIndexedSupplier theSupplier = CompressedLongsIndexedSupplier.fromLongBuffer(
- LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder()
+ LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), compressionStrategy
);
theSupplier.writeToChannel(Channels.newChannel(baos));
diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java
index 029297e73e8..bb5d6ec444e 100644
--- a/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java
+++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsSupplierSerializerTest.java
@@ -23,6 +23,8 @@ import com.google.common.io.OutputSupplier;
import io.druid.collections.ResourceHolder;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -31,21 +33,27 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
-/**
- */
-public class CompressedLongsSupplierSerializerTest
+@RunWith(Parameterized.class)
+public class CompressedLongsSupplierSerializerTest extends CompressionStrategyTest
{
+ public CompressedLongsSupplierSerializerTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
+ {
+ super(compressionStrategy);
+ }
+
@Test
public void testSanity() throws Exception
{
final ByteOrder order = ByteOrder.nativeOrder();
+ final int sizePer = 999;
CompressedLongsSupplierSerializer serializer = new CompressedLongsSupplierSerializer(
- 999,
+ sizePer,
new GenericIndexedWriter>(
new IOPeonForTesting(),
"test",
- CompressedLongBufferObjectStrategy.getBufferForOrder(order)
- )
+ CompressedLongBufferObjectStrategy.getBufferForOrder(order, compressionStrategy, sizePer)
+ ),
+ compressionStrategy
);
serializer.open();
diff --git a/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java b/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java
new file mode 100644
index 00000000000..845ed8369c7
--- /dev/null
+++ b/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java
@@ -0,0 +1,52 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.segment.data;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+
+public class CompressionStrategyTest
+{
+ @Parameterized.Parameters
+ public static Iterable