initial support for LZ4 compression

This commit is contained in:
Xavier Léauté 2014-05-08 18:06:26 -07:00
parent b8347cf4af
commit c40a315c81
18 changed files with 447 additions and 101 deletions

View File

@ -379,7 +379,7 @@
<dependency> <dependency>
<groupId>net.jpountz.lz4</groupId> <groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId> <artifactId>lz4</artifactId>
<version>1.1.2</version> <version>1.2.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>

View File

@ -82,6 +82,10 @@
<groupId>com.davekoelle</groupId> <groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId> <artifactId>alphanum</artifactId>
</dependency> </dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<!-- Tests --> <!-- Tests -->

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import com.google.common.io.Files; import com.google.common.io.Files;
import io.druid.segment.data.CompressedFloatsSupplierSerializer; import io.druid.segment.data.CompressedFloatsSupplierSerializer;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.IOPeon; import io.druid.segment.data.IOPeon;
import java.io.File; import java.io.File;
@ -51,7 +52,8 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer
public void open() throws IOException public void open() throws IOException
{ {
writer = CompressedFloatsSupplierSerializer.create( writer = CompressedFloatsSupplierSerializer.create(
ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER,
CompressedObjectStrategy.CompressionStrategy.LZ4 // TODO define this somewhere else
); );
writer.open(); writer.open();

View File

@ -54,6 +54,7 @@ import io.druid.query.aggregation.ToLowerCaseAggregatorFactory;
import io.druid.segment.column.ColumnConfig; import io.druid.segment.column.ColumnConfig;
import io.druid.segment.data.ByteBufferWriter; import io.druid.segment.data.ByteBufferWriter;
import io.druid.segment.data.CompressedLongsSupplierSerializer; import io.druid.segment.data.CompressedLongsSupplierSerializer;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseCompressedIndexedInts; import io.druid.segment.data.ConciseCompressedIndexedInts;
import io.druid.segment.data.GenericIndexed; import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.GenericIndexedWriter; import io.druid.segment.data.GenericIndexedWriter;
@ -594,7 +595,8 @@ public class IndexMerger
Iterable<Rowboat> theRows = rowMergerFn.apply(boats); Iterable<Rowboat> theRows = rowMergerFn.apply(boats);
CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create( CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "little_end_time", IndexIO.BYTE_ORDER ioPeon, "little_end_time", IndexIO.BYTE_ORDER,
CompressedObjectStrategy.CompressionStrategy.LZ4 // TODO define this somewhere else
); );
timeWriter.open(); timeWriter.open();

View File

@ -28,14 +28,16 @@ import java.nio.FloatBuffer;
/** /**
*/ */
public class CompressedFloatBufferObjectStrategy extends CompressedObjectStrategy<FloatBuffer> public class CompressedFloatBufferObjectStrategy extends FixedSizeCompressedObjectStrategy<FloatBuffer>
{ {
public static CompressedFloatBufferObjectStrategy getBufferForOrder(ByteOrder order) public static CompressedFloatBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{ {
return new CompressedFloatBufferObjectStrategy(order); return new CompressedFloatBufferObjectStrategy(order, compression, sizePer);
} }
private CompressedFloatBufferObjectStrategy(final ByteOrder order) private final int sizePer;
private CompressedFloatBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{ {
super( super(
order, order,
@ -64,7 +66,15 @@ public class CompressedFloatBufferObjectStrategy extends CompressedObjectStrateg
{ {
return into.asFloatBuffer().put(from); return into.asFloatBuffer().put(from);
} }
} },
compression
); );
this.sizePer = sizePer;
}
@Override
public int getSize()
{
return sizePer;
} }
} }

View File

@ -40,22 +40,26 @@ import java.util.Iterator;
*/ */
public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats> public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
{ {
public static final byte version = 0x1; public static final byte LZF_VERSION = 0x1;
public static final byte version = 0x2;
public static final int MAX_FLOATS_IN_BUFFER = (0xFFFF >> 2); public static final int MAX_FLOATS_IN_BUFFER = (0xFFFF >> 2);
private final int totalSize; private final int totalSize;
private final int sizePer; private final int sizePer;
private final GenericIndexed<ResourceHolder<FloatBuffer>> baseFloatBuffers; private final GenericIndexed<ResourceHolder<FloatBuffer>> baseFloatBuffers;
private final CompressedObjectStrategy.CompressionStrategy compression;
CompressedFloatsIndexedSupplier( CompressedFloatsIndexedSupplier(
int totalSize, int totalSize,
int sizePer, int sizePer,
GenericIndexed<ResourceHolder<FloatBuffer>> baseFloatBuffers GenericIndexed<ResourceHolder<FloatBuffer>> baseFloatBuffers,
CompressedObjectStrategy.CompressionStrategy compression
) )
{ {
this.totalSize = totalSize; this.totalSize = totalSize;
this.sizePer = sizePer; this.sizePer = sizePer;
this.baseFloatBuffers = baseFloatBuffers; this.baseFloatBuffers = baseFloatBuffers;
this.compression = compression;
} }
public int size() public int size()
@ -151,7 +155,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
public long getSerializedSize() public long getSerializedSize()
{ {
return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4; return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4 + 1;
} }
public void writeToChannel(WritableByteChannel channel) throws IOException public void writeToChannel(WritableByteChannel channel) throws IOException
@ -159,6 +163,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
channel.write(ByteBuffer.wrap(new byte[]{version})); channel.write(ByteBuffer.wrap(new byte[]{version}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize))); channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer))); channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer)));
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
baseFloatBuffers.writeToChannel(channel); baseFloatBuffers.writeToChannel(channel);
} }
@ -167,7 +172,8 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
return new CompressedFloatsIndexedSupplier( return new CompressedFloatsIndexedSupplier(
totalSize, totalSize,
sizePer, sizePer,
GenericIndexed.fromIterable(baseFloatBuffers, CompressedFloatBufferObjectStrategy.getBufferForOrder(order)) GenericIndexed.fromIterable(baseFloatBuffers, CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
compression
); );
} }
@ -191,23 +197,53 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
byte versionFromBuffer = buffer.get(); byte versionFromBuffer = buffer.get();
if (versionFromBuffer == version) { if (versionFromBuffer == version) {
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressedObjectStrategy.CompressionStrategy compression =
CompressedObjectStrategy.CompressionStrategy.forId(buffer.get());
return new CompressedFloatsIndexedSupplier( return new CompressedFloatsIndexedSupplier(
buffer.getInt(), totalSize,
buffer.getInt(), sizePer,
GenericIndexed.read(buffer, CompressedFloatBufferObjectStrategy.getBufferForOrder(order)) GenericIndexed.read(
buffer,
CompressedFloatBufferObjectStrategy.getBufferForOrder(
order,
compression,
sizePer
)
),
compression
);
} else if (versionFromBuffer == LZF_VERSION) {
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.LZF;
return new CompressedFloatsIndexedSupplier(
totalSize,
sizePer,
GenericIndexed.read(
buffer,
CompressedFloatBufferObjectStrategy.getBufferForOrder(
order,
compression,
sizePer
)
),
compression
); );
} }
throw new IAE("Unknown version[%s]", versionFromBuffer); throw new IAE("Unknown version[%s]", versionFromBuffer);
} }
public static CompressedFloatsIndexedSupplier fromFloatBuffer(FloatBuffer buffer, final ByteOrder order) public static CompressedFloatsIndexedSupplier fromFloatBuffer(FloatBuffer buffer, final ByteOrder order, CompressedObjectStrategy.CompressionStrategy compression)
{ {
return fromFloatBuffer(buffer, MAX_FLOATS_IN_BUFFER, order); return fromFloatBuffer(buffer, MAX_FLOATS_IN_BUFFER, order, compression);
} }
public static CompressedFloatsIndexedSupplier fromFloatBuffer( public static CompressedFloatsIndexedSupplier fromFloatBuffer(
final FloatBuffer buffer, final int chunkFactor, final ByteOrder order final FloatBuffer buffer, final int chunkFactor, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) )
{ {
Preconditions.checkArgument( Preconditions.checkArgument(
@ -254,8 +290,9 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
}; };
} }
}, },
CompressedFloatBufferObjectStrategy.getBufferForOrder(order) CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, chunkFactor)
) ),
compression
); );
} }

View File

@ -36,27 +36,29 @@ import java.nio.FloatBuffer;
public class CompressedFloatsSupplierSerializer public class CompressedFloatsSupplierSerializer
{ {
public static CompressedFloatsSupplierSerializer create( public static CompressedFloatsSupplierSerializer create(
IOPeon ioPeon, final String filenameBase, final ByteOrder order IOPeon ioPeon, final String filenameBase, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) throws IOException ) throws IOException
{ {
return create(ioPeon, filenameBase, CompressedFloatsIndexedSupplier.MAX_FLOATS_IN_BUFFER, order); return create(ioPeon, filenameBase, CompressedFloatsIndexedSupplier.MAX_FLOATS_IN_BUFFER, order, compression);
} }
public static CompressedFloatsSupplierSerializer create( public static CompressedFloatsSupplierSerializer create(
IOPeon ioPeon, final String filenameBase, final int sizePer, final ByteOrder order IOPeon ioPeon, final String filenameBase, final int sizePer, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) throws IOException ) throws IOException
{ {
final CompressedFloatsSupplierSerializer retVal = new CompressedFloatsSupplierSerializer( final CompressedFloatsSupplierSerializer retVal = new CompressedFloatsSupplierSerializer(
sizePer, sizePer,
new GenericIndexedWriter<ResourceHolder<FloatBuffer>>( new GenericIndexedWriter<ResourceHolder<FloatBuffer>>(
ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order) ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)
) ),
compression
); );
return retVal; return retVal;
} }
private final int sizePer; private final int sizePer;
private final GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener; private final GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener;
private final CompressedObjectStrategy.CompressionStrategy compression;
private int numInserted = 0; private int numInserted = 0;
@ -64,11 +66,13 @@ public class CompressedFloatsSupplierSerializer
public CompressedFloatsSupplierSerializer( public CompressedFloatsSupplierSerializer(
int sizePer, int sizePer,
GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener,
CompressedObjectStrategy.CompressionStrategy compression
) )
{ {
this.sizePer = sizePer; this.sizePer = sizePer;
this.flattener = flattener; this.flattener = flattener;
this.compression = compression;
endBuffer = FloatBuffer.allocate(sizePer); endBuffer = FloatBuffer.allocate(sizePer);
endBuffer.mark(); endBuffer.mark();
@ -110,6 +114,7 @@ public class CompressedFloatsSupplierSerializer
out.write(CompressedFloatsIndexedSupplier.version); out.write(CompressedFloatsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted)); out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer)); out.write(Ints.toByteArray(sizePer));
out.write(new byte[]{compression.getId()});
ByteStreams.copy(flattener.combineStreams(), out); ByteStreams.copy(flattener.combineStreams(), out);
} }
} }

View File

@ -28,14 +28,16 @@ import java.nio.LongBuffer;
/** /**
*/ */
public class CompressedLongBufferObjectStrategy extends CompressedObjectStrategy<LongBuffer> public class CompressedLongBufferObjectStrategy extends FixedSizeCompressedObjectStrategy<LongBuffer>
{ {
public static CompressedLongBufferObjectStrategy getBufferForOrder(ByteOrder order) public static CompressedLongBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{ {
return new CompressedLongBufferObjectStrategy(order); return new CompressedLongBufferObjectStrategy(order, compression, sizePer);
} }
private CompressedLongBufferObjectStrategy(final ByteOrder order) private final int sizePer;
private CompressedLongBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{ {
super( super(
order, order,
@ -64,8 +66,15 @@ public class CompressedLongBufferObjectStrategy extends CompressedObjectStrategy
{ {
return into.asLongBuffer().put(from); return into.asLongBuffer().put(from);
} }
} },
compression
); );
this.sizePer = sizePer;
} }
@Override
public int getSize()
{
return sizePer;
}
} }

View File

@ -40,21 +40,25 @@ import java.util.Iterator;
*/ */
public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs> public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
{ {
public static final byte version = 0x1; public static final byte LZF_VERSION = 0x1;
public static final byte version = 0x2;
private final int totalSize; private final int totalSize;
private final int sizePer; private final int sizePer;
private final GenericIndexed<ResourceHolder<LongBuffer>> baseLongBuffers; private final GenericIndexed<ResourceHolder<LongBuffer>> baseLongBuffers;
private final CompressedObjectStrategy.CompressionStrategy compression;
CompressedLongsIndexedSupplier( CompressedLongsIndexedSupplier(
int totalSize, int totalSize,
int sizePer, int sizePer,
GenericIndexed<ResourceHolder<LongBuffer>> baseLongBuffers GenericIndexed<ResourceHolder<LongBuffer>> baseLongBuffers,
CompressedObjectStrategy.CompressionStrategy compression
) )
{ {
this.totalSize = totalSize; this.totalSize = totalSize;
this.sizePer = sizePer; this.sizePer = sizePer;
this.baseLongBuffers = baseLongBuffers; this.baseLongBuffers = baseLongBuffers;
this.compression = compression;
} }
public int size() public int size()
@ -162,7 +166,7 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
public long getSerializedSize() public long getSerializedSize()
{ {
return baseLongBuffers.getSerializedSize() + 1 + 4 + 4; return baseLongBuffers.getSerializedSize() + 1 + 4 + 4 + 1;
} }
public void writeToChannel(WritableByteChannel channel) throws IOException public void writeToChannel(WritableByteChannel channel) throws IOException
@ -170,15 +174,17 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
channel.write(ByteBuffer.wrap(new byte[]{version})); channel.write(ByteBuffer.wrap(new byte[]{version}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize))); channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer))); channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer)));
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
baseLongBuffers.writeToChannel(channel); baseLongBuffers.writeToChannel(channel);
} }
public CompressedLongsIndexedSupplier convertByteOrder(ByteOrder order) public CompressedLongsIndexedSupplier convertByteOrder(ByteOrder order, CompressedObjectStrategy.CompressionStrategy compression)
{ {
return new CompressedLongsIndexedSupplier( return new CompressedLongsIndexedSupplier(
totalSize, totalSize,
sizePer, sizePer,
GenericIndexed.fromIterable(baseLongBuffers, CompressedLongBufferObjectStrategy.getBufferForOrder(order)) GenericIndexed.fromIterable(baseLongBuffers, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
compression
); );
} }
@ -196,23 +202,37 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
byte versionFromBuffer = buffer.get(); byte versionFromBuffer = buffer.get();
if (versionFromBuffer == version) { if (versionFromBuffer == version) {
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get());
return new CompressedLongsIndexedSupplier( return new CompressedLongsIndexedSupplier(
buffer.getInt(), totalSize,
buffer.getInt(), sizePer,
GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order)) GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
compression
);
} else if (versionFromBuffer == LZF_VERSION) {
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.LZF;
return new CompressedLongsIndexedSupplier(
totalSize,
sizePer,
GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
compression
); );
} }
throw new IAE("Unknown version[%s]", versionFromBuffer); throw new IAE("Unknown version[%s]", versionFromBuffer);
} }
public static CompressedLongsIndexedSupplier fromLongBuffer(LongBuffer buffer, final ByteOrder byteOrder) public static CompressedLongsIndexedSupplier fromLongBuffer(LongBuffer buffer, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression)
{ {
return fromLongBuffer(buffer, 0xFFFF / Longs.BYTES, byteOrder); return fromLongBuffer(buffer, 0xFFFF / Longs.BYTES, byteOrder, compression);
} }
public static CompressedLongsIndexedSupplier fromLongBuffer( public static CompressedLongsIndexedSupplier fromLongBuffer(
final LongBuffer buffer, final int chunkFactor, final ByteOrder byteOrder final LongBuffer buffer, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression
) )
{ {
Preconditions.checkArgument( Preconditions.checkArgument(
@ -259,8 +279,9 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
}; };
} }
}, },
CompressedLongBufferObjectStrategy.getBufferForOrder(byteOrder) CompressedLongBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor)
) ),
compression
); );
} }

View File

@ -37,20 +37,23 @@ import java.nio.LongBuffer;
public class CompressedLongsSupplierSerializer public class CompressedLongsSupplierSerializer
{ {
public static CompressedLongsSupplierSerializer create( public static CompressedLongsSupplierSerializer create(
IOPeon ioPeon, final String filenameBase, final ByteOrder order IOPeon ioPeon, final String filenameBase, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) throws IOException ) throws IOException
{ {
final int sizePer = 0xFFFF / Longs.BYTES;
final CompressedLongsSupplierSerializer retVal = new CompressedLongsSupplierSerializer( final CompressedLongsSupplierSerializer retVal = new CompressedLongsSupplierSerializer(
0xFFFF / Longs.BYTES, sizePer,
new GenericIndexedWriter<ResourceHolder<LongBuffer>>( new GenericIndexedWriter<ResourceHolder<LongBuffer>>(
ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order) ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)
) ),
compression
); );
return retVal; return retVal;
} }
private final int sizePer; private final int sizePer;
private final GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener; private final GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener;
private final CompressedObjectStrategy.CompressionStrategy compression;
private int numInserted = 0; private int numInserted = 0;
@ -58,11 +61,13 @@ public class CompressedLongsSupplierSerializer
public CompressedLongsSupplierSerializer( public CompressedLongsSupplierSerializer(
int sizePer, int sizePer,
GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener,
CompressedObjectStrategy.CompressionStrategy compression
) )
{ {
this.sizePer = sizePer; this.sizePer = sizePer;
this.flattener = flattener; this.flattener = flattener;
this.compression = compression;
endBuffer = LongBuffer.allocate(sizePer); endBuffer = LongBuffer.allocate(sizePer);
endBuffer.mark(); endBuffer.mark();
@ -104,6 +109,7 @@ public class CompressedLongsSupplierSerializer
out.write(CompressedLongsIndexedSupplier.version); out.write(CompressedLongsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted)); out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer)); out.write(Ints.toByteArray(sizePer));
out.write(new byte[]{compression.getId()});
ByteStreams.copy(flattener.combineStreams(), out); ByteStreams.copy(flattener.combineStreams(), out);
} }
} }

View File

@ -21,31 +21,216 @@ package io.druid.segment.data;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.google.common.collect.Maps;
import com.ning.compress.lzf.ChunkEncoder; import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk; import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.LZFDecoder; import com.ning.compress.lzf.LZFDecoder;
import io.druid.collections.ResourceHolder; import io.druid.collections.ResourceHolder;
import io.druid.segment.CompressedPools; import io.druid.segment.CompressedPools;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.lz4.LZ4UnknownSizeDecompressor;
import java.io.IOException; import java.io.IOException;
import java.nio.Buffer; import java.nio.Buffer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.Map;
/** /**
*/ */
public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrategy<ResourceHolder<T>> public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrategy<ResourceHolder<T>>
{ {
private final ByteOrder order; public static enum CompressionStrategy {
private final BufferConverter<T> converter; LZF ((byte)0x0)
{
@Override
public Decompressor getDecompressor()
{
return new LZFDecompressor();
}
@Override
public Compressor getCompressor()
{
return new LZFCompressor();
}
},
LZ4 ((byte)0x1) {
@Override
public Decompressor getDecompressor()
{
return new LZ4Decompressor();
}
@Override
public Compressor getCompressor()
{
return new LZ4Compressor();
}
};
final byte id;
CompressionStrategy(byte id) {
this.id = id;
}
public byte getId()
{
return id;
}
public abstract Compressor getCompressor();
public abstract Decompressor getDecompressor();
static final Map<Byte, CompressionStrategy> idMap = Maps.newHashMap();
static {
for(CompressionStrategy strategy : CompressionStrategy.values()) idMap.put(strategy.getId(), strategy);
}
public static CompressionStrategy forId(byte id)
{
return idMap.get(id);
}
}
public static interface Decompressor
{
/**
* Implementations of this method are expected to call out.flip() after writing to the output buffer
*
* @param in
* @param numBytes
* @param out
*/
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out);
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize);
}
public static interface Compressor
{
/**
* Currently assumes buf is an array backed ByteBuffer
*
* @param bytes
* @return
*/
public byte[] compress(byte[] bytes);
}
public static class LZFDecompressor implements Decompressor
{
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
final byte[] bytes = new byte[numBytes];
in.get(bytes);
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
final int numDecompressedBytes = LZFDecoder.decode(bytes, outputBytes);
out.put(outputBytes, 0, numDecompressedBytes);
out.flip();
}
catch (IOException e) {
Throwables.propagate(e);
}
}
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize)
{
decompress(in, numBytes, out);
}
}
public static class LZFCompressor implements Compressor
{
@Override
public byte[] compress(byte[] bytes)
{
final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder();
LZFChunk chunk = encoder.get().encodeChunk(bytes, 0, bytes.length);
CloseQuietly.close(encoder);
return chunk.getData();
}
}
public static class LZ4Decompressor implements Decompressor
{
private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor();
private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
final byte[] bytes = new byte[numBytes];
in.get(bytes);
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length);
out.put(outputBytes, 0, numDecompressedBytes);
out.flip();
}
catch (IOException e) {
Throwables.propagate(e);
}
}
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize)
{
final byte[] bytes = new byte[numBytes];
in.get(bytes);
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize);
out.put(outputBytes, 0, decompressedSize);
out.flip();
}
catch (IOException e) {
Throwables.propagate(e);
}
}
}
public static class LZ4Compressor implements Compressor
{
private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor();
@Override
public byte[] compress(byte[] bytes)
{
final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)];
final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length);
final byte[] out = new byte[outputBytes];
System.arraycopy(intermediate, 0, out, 0, outputBytes);
return out;
}
}
protected final ByteOrder order;
protected final BufferConverter<T> converter;
protected final Decompressor decompressor;
private final Compressor compressor;
protected CompressedObjectStrategy( protected CompressedObjectStrategy(
final ByteOrder order, final ByteOrder order,
final BufferConverter<T> converter final BufferConverter<T> converter,
final CompressionStrategy compression
) )
{ {
this.order = order; this.order = order;
this.converter = converter; this.converter = converter;
this.decompressor = compression.getDecompressor();
this.compressor = compression.getCompressor();
} }
@Override @Override
@ -58,24 +243,12 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
@Override @Override
public ResourceHolder<T> fromByteBuffer(ByteBuffer buffer, int numBytes) public ResourceHolder<T> fromByteBuffer(ByteBuffer buffer, int numBytes)
{ {
byte[] bytes = new byte[numBytes];
buffer.get(bytes);
final ResourceHolder<ByteBuffer> bufHolder = CompressedPools.getByteBuf(order); final ResourceHolder<ByteBuffer> bufHolder = CompressedPools.getByteBuf(order);
final ByteBuffer buf = bufHolder.get(); final ByteBuffer buf = bufHolder.get();
buf.position(0); buf.position(0);
buf.limit(buf.capacity()); buf.limit(buf.capacity());
try { decompress(buffer, numBytes, buf);
final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes();
byte[] outputBytes = outputBytesHolder.get();
int numDecompressedBytes = LZFDecoder.decode(bytes, outputBytes);
buf.put(outputBytes, 0, numDecompressedBytes);
buf.flip();
CloseQuietly.close(outputBytesHolder);
return new ResourceHolder<T>() return new ResourceHolder<T>()
{ {
@Override @Override
@ -91,23 +264,28 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
} }
}; };
} }
catch (IOException e) {
throw Throwables.propagate(e); protected void decompress(
} ByteBuffer buffer,
int numBytes,
ByteBuffer buf
)
{
decompressor.decompress(buffer, numBytes, buf);
} }
@Override @Override
public byte[] toBytes(ResourceHolder<T> holder) public byte[] toBytes(ResourceHolder<T> holder)
{ {
T val = holder.get(); T val = holder.get();
ByteBuffer buf = ByteBuffer.allocate(converter.sizeOf(val.remaining())).order(order); ByteBuffer buf = bufferFor(val);
converter.combine(buf, val); converter.combine(buf, val);
return compressor.compress(buf.array());
}
final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder(); protected ByteBuffer bufferFor(T val)
LZFChunk chunk = encoder.get().encodeChunk(buf.array(), 0, buf.array().length); {
CloseQuietly.close(encoder); return ByteBuffer.allocate(converter.sizeOf(val.remaining())).order(order);
return chunk.getData();
} }
@Override @Override

View File

@ -0,0 +1,50 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.data;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public abstract class FixedSizeCompressedObjectStrategy<T extends Buffer> extends CompressedObjectStrategy<T>
{
protected FixedSizeCompressedObjectStrategy(
ByteOrder order,
BufferConverter<T> converter,
CompressionStrategy compression
)
{
super(order, converter, compression);
}
public abstract int getSize();
@Override
protected ByteBuffer bufferFor(T val)
{
return ByteBuffer.allocate(converter.sizeOf(getSize())).order(order);
}
@Override
protected void decompress(ByteBuffer buffer, int numBytes, ByteBuffer buf)
{
decompressor.decompress(buffer, numBytes, buf, converter.sizeOf(getSize()));
}
}

View File

@ -37,6 +37,7 @@ import java.util.List;
*/ */
public class InMemoryCompressedFloats implements IndexedFloats public class InMemoryCompressedFloats implements IndexedFloats
{ {
public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.CompressionStrategy.LZ4;
private final CompressedFloatBufferObjectStrategy strategy; private final CompressedFloatBufferObjectStrategy strategy;
private final int sizePer; private final int sizePer;
@ -56,7 +57,11 @@ public class InMemoryCompressedFloats implements IndexedFloats
) )
{ {
this.sizePer = sizePer; this.sizePer = sizePer;
strategy = CompressedFloatBufferObjectStrategy.getBufferForOrder(order); strategy = CompressedFloatBufferObjectStrategy.getBufferForOrder(
order,
COMPRESSION,
sizePer
);
endBuffer = FloatBuffer.allocate(sizePer); endBuffer = FloatBuffer.allocate(sizePer);
endBuffer.mark(); endBuffer.mark();
@ -184,7 +189,8 @@ public class InMemoryCompressedFloats implements IndexedFloats
Arrays.<ResourceHolder<FloatBuffer>>asList(StupidResourceHolder.create(endBufCopy)) Arrays.<ResourceHolder<FloatBuffer>>asList(StupidResourceHolder.create(endBufCopy))
), ),
strategy strategy
) ),
COMPRESSION
); );
} }

View File

@ -38,6 +38,7 @@ import java.util.List;
*/ */
public class InMemoryCompressedLongs implements IndexedLongs public class InMemoryCompressedLongs implements IndexedLongs
{ {
public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.CompressionStrategy.LZ4;
private final CompressedLongBufferObjectStrategy strategy; private final CompressedLongBufferObjectStrategy strategy;
private final int sizePer; private final int sizePer;
@ -57,7 +58,11 @@ public class InMemoryCompressedLongs implements IndexedLongs
) )
{ {
this.sizePer = sizePer; this.sizePer = sizePer;
strategy = CompressedLongBufferObjectStrategy.getBufferForOrder(order); strategy = CompressedLongBufferObjectStrategy.getBufferForOrder(
order,
COMPRESSION,
sizePer
);
endBuffer = LongBuffer.allocate(sizePer); endBuffer = LongBuffer.allocate(sizePer);
endBuffer.mark(); endBuffer.mark();
@ -195,7 +200,8 @@ public class InMemoryCompressedLongs implements IndexedLongs
Arrays.<ResourceHolder<LongBuffer>>asList(StupidResourceHolder.create(longBufCopy)) Arrays.<ResourceHolder<LongBuffer>>asList(StupidResourceHolder.create(longBufCopy))
), ),
strategy strategy
) ),
COMPRESSION
); );
} }

View File

@ -68,7 +68,8 @@ public class CompressedFloatsIndexedSupplierTest
supplier = CompressedFloatsIndexedSupplier.fromFloatBuffer( supplier = CompressedFloatsIndexedSupplier.fromFloatBuffer(
FloatBuffer.wrap(vals), FloatBuffer.wrap(vals),
5, 5,
ByteOrder.nativeOrder() ByteOrder.nativeOrder(),
CompressedObjectStrategy.CompressionStrategy.LZ4
); );
indexed = supplier.get(); indexed = supplier.get();
@ -82,7 +83,7 @@ public class CompressedFloatsIndexedSupplierTest
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer( final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer(
FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder() FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4
); );
theSupplier.writeToChannel(Channels.newChannel(baos)); theSupplier.writeToChannel(Channels.newChannel(baos));

View File

@ -39,13 +39,19 @@ public class CompressedFloatsSupplierSerializerTest
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
final ByteOrder order = ByteOrder.nativeOrder(); final ByteOrder order = ByteOrder.nativeOrder();
final int sizePer = 999;
CompressedFloatsSupplierSerializer serializer = new CompressedFloatsSupplierSerializer( CompressedFloatsSupplierSerializer serializer = new CompressedFloatsSupplierSerializer(
999, sizePer,
new GenericIndexedWriter<ResourceHolder<FloatBuffer>>( new GenericIndexedWriter<ResourceHolder<FloatBuffer>>(
new IOPeonForTesting(), new IOPeonForTesting(),
"test", "test",
CompressedFloatBufferObjectStrategy.getBufferForOrder(order) CompressedFloatBufferObjectStrategy.getBufferForOrder(
order,
CompressedObjectStrategy.CompressionStrategy.LZ4,
sizePer
) )
),
CompressedObjectStrategy.CompressionStrategy.LZ4
); );
serializer.open(); serializer.open();

View File

@ -66,7 +66,8 @@ public class CompressedLongsIndexedSupplierTest
supplier = CompressedLongsIndexedSupplier.fromLongBuffer( supplier = CompressedLongsIndexedSupplier.fromLongBuffer(
LongBuffer.wrap(vals), LongBuffer.wrap(vals),
5, 5,
ByteOrder.nativeOrder() ByteOrder.nativeOrder(),
CompressedObjectStrategy.CompressionStrategy.LZ4
); );
indexed = supplier.get(); indexed = supplier.get();
@ -78,7 +79,7 @@ public class CompressedLongsIndexedSupplierTest
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
final CompressedLongsIndexedSupplier theSupplier = CompressedLongsIndexedSupplier.fromLongBuffer( final CompressedLongsIndexedSupplier theSupplier = CompressedLongsIndexedSupplier.fromLongBuffer(
LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder() LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4
); );
theSupplier.writeToChannel(Channels.newChannel(baos)); theSupplier.writeToChannel(Channels.newChannel(baos));

View File

@ -39,13 +39,15 @@ public class CompressedLongsSupplierSerializerTest
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
final ByteOrder order = ByteOrder.nativeOrder(); final ByteOrder order = ByteOrder.nativeOrder();
final int sizePer = 999;
CompressedLongsSupplierSerializer serializer = new CompressedLongsSupplierSerializer( CompressedLongsSupplierSerializer serializer = new CompressedLongsSupplierSerializer(
999, sizePer,
new GenericIndexedWriter<ResourceHolder<LongBuffer>>( new GenericIndexedWriter<ResourceHolder<LongBuffer>>(
new IOPeonForTesting(), new IOPeonForTesting(),
"test", "test",
CompressedLongBufferObjectStrategy.getBufferForOrder(order) CompressedLongBufferObjectStrategy.getBufferForOrder(order, CompressedObjectStrategy.CompressionStrategy.LZ4, sizePer)
) ),
CompressedObjectStrategy.CompressionStrategy.LZ4
); );
serializer.open(); serializer.open();