Merge pull request #632 from metamx/lz4-compression

Change default column compression from LZF to LZ4
This commit is contained in:
fjy 2014-08-08 14:24:00 -06:00
commit 4af6a05378
19 changed files with 525 additions and 112 deletions

View File

@ -384,7 +384,7 @@
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.1.2</version>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>

View File

@ -82,6 +82,10 @@
<groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<!-- Tests -->

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import com.google.common.io.Files;
import io.druid.segment.data.CompressedFloatsSupplierSerializer;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.IOPeon;
import java.io.File;
@ -51,7 +52,8 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer
public void open() throws IOException
{
writer = CompressedFloatsSupplierSerializer.create(
ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER
ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER,
CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
);
writer.open();

View File

@ -54,6 +54,7 @@ import io.druid.query.aggregation.ToLowerCaseAggregatorFactory;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.data.ByteBufferWriter;
import io.druid.segment.data.CompressedLongsSupplierSerializer;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseCompressedIndexedInts;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.GenericIndexedWriter;
@ -594,7 +595,8 @@ public class IndexMerger
Iterable<Rowboat> theRows = rowMergerFn.apply(boats);
CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "little_end_time", IndexIO.BYTE_ORDER
ioPeon, "little_end_time", IndexIO.BYTE_ORDER,
CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
);
timeWriter.open();

View File

@ -28,14 +28,14 @@ import java.nio.FloatBuffer;
/**
*/
public class CompressedFloatBufferObjectStrategy extends CompressedObjectStrategy<FloatBuffer>
public class CompressedFloatBufferObjectStrategy extends FixedSizeCompressedObjectStrategy<FloatBuffer>
{
public static CompressedFloatBufferObjectStrategy getBufferForOrder(ByteOrder order)
public static CompressedFloatBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{
return new CompressedFloatBufferObjectStrategy(order);
return new CompressedFloatBufferObjectStrategy(order, compression, sizePer);
}
private CompressedFloatBufferObjectStrategy(final ByteOrder order)
private CompressedFloatBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{
super(
order,
@ -64,7 +64,9 @@ public class CompressedFloatBufferObjectStrategy extends CompressedObjectStrateg
{
return into.asFloatBuffer().put(from);
}
}
},
compression,
sizePer
);
}
}

View File

@ -40,22 +40,26 @@ import java.util.Iterator;
*/
public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
{
public static final byte version = 0x1;
public static final byte LZF_VERSION = 0x1;
public static final byte version = 0x2;
public static final int MAX_FLOATS_IN_BUFFER = (0xFFFF >> 2);
private final int totalSize;
private final int sizePer;
private final GenericIndexed<ResourceHolder<FloatBuffer>> baseFloatBuffers;
private final CompressedObjectStrategy.CompressionStrategy compression;
CompressedFloatsIndexedSupplier(
int totalSize,
int sizePer,
GenericIndexed<ResourceHolder<FloatBuffer>> baseFloatBuffers
GenericIndexed<ResourceHolder<FloatBuffer>> baseFloatBuffers,
CompressedObjectStrategy.CompressionStrategy compression
)
{
this.totalSize = totalSize;
this.sizePer = sizePer;
this.baseFloatBuffers = baseFloatBuffers;
this.compression = compression;
}
public int size()
@ -151,7 +155,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
public long getSerializedSize()
{
return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4;
return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4 + 1;
}
public void writeToChannel(WritableByteChannel channel) throws IOException
@ -159,6 +163,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
channel.write(ByteBuffer.wrap(new byte[]{version}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer)));
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
baseFloatBuffers.writeToChannel(channel);
}
@ -167,7 +172,8 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
return new CompressedFloatsIndexedSupplier(
totalSize,
sizePer,
GenericIndexed.fromIterable(baseFloatBuffers, CompressedFloatBufferObjectStrategy.getBufferForOrder(order))
GenericIndexed.fromIterable(baseFloatBuffers, CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
compression
);
}
@ -191,23 +197,53 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == version) {
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressedObjectStrategy.CompressionStrategy compression =
CompressedObjectStrategy.CompressionStrategy.forId(buffer.get());
return new CompressedFloatsIndexedSupplier(
buffer.getInt(),
buffer.getInt(),
GenericIndexed.read(buffer, CompressedFloatBufferObjectStrategy.getBufferForOrder(order))
totalSize,
sizePer,
GenericIndexed.read(
buffer,
CompressedFloatBufferObjectStrategy.getBufferForOrder(
order,
compression,
sizePer
)
),
compression
);
} else if (versionFromBuffer == LZF_VERSION) {
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.LZF;
return new CompressedFloatsIndexedSupplier(
totalSize,
sizePer,
GenericIndexed.read(
buffer,
CompressedFloatBufferObjectStrategy.getBufferForOrder(
order,
compression,
sizePer
)
),
compression
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
public static CompressedFloatsIndexedSupplier fromFloatBuffer(FloatBuffer buffer, final ByteOrder order)
public static CompressedFloatsIndexedSupplier fromFloatBuffer(FloatBuffer buffer, final ByteOrder order, CompressedObjectStrategy.CompressionStrategy compression)
{
return fromFloatBuffer(buffer, MAX_FLOATS_IN_BUFFER, order);
return fromFloatBuffer(buffer, MAX_FLOATS_IN_BUFFER, order, compression);
}
public static CompressedFloatsIndexedSupplier fromFloatBuffer(
final FloatBuffer buffer, final int chunkFactor, final ByteOrder order
final FloatBuffer buffer, final int chunkFactor, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
)
{
Preconditions.checkArgument(
@ -254,8 +290,9 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
};
}
},
CompressedFloatBufferObjectStrategy.getBufferForOrder(order)
)
CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, chunkFactor)
),
compression
);
}

View File

@ -36,27 +36,29 @@ import java.nio.FloatBuffer;
public class CompressedFloatsSupplierSerializer
{
public static CompressedFloatsSupplierSerializer create(
IOPeon ioPeon, final String filenameBase, final ByteOrder order
IOPeon ioPeon, final String filenameBase, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) throws IOException
{
return create(ioPeon, filenameBase, CompressedFloatsIndexedSupplier.MAX_FLOATS_IN_BUFFER, order);
return create(ioPeon, filenameBase, CompressedFloatsIndexedSupplier.MAX_FLOATS_IN_BUFFER, order, compression);
}
public static CompressedFloatsSupplierSerializer create(
IOPeon ioPeon, final String filenameBase, final int sizePer, final ByteOrder order
IOPeon ioPeon, final String filenameBase, final int sizePer, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) throws IOException
{
final CompressedFloatsSupplierSerializer retVal = new CompressedFloatsSupplierSerializer(
sizePer,
new GenericIndexedWriter<ResourceHolder<FloatBuffer>>(
ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order)
)
ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)
),
compression
);
return retVal;
}
private final int sizePer;
private final GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener;
private final CompressedObjectStrategy.CompressionStrategy compression;
private int numInserted = 0;
@ -64,11 +66,13 @@ public class CompressedFloatsSupplierSerializer
public CompressedFloatsSupplierSerializer(
int sizePer,
GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener
GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener,
CompressedObjectStrategy.CompressionStrategy compression
)
{
this.sizePer = sizePer;
this.flattener = flattener;
this.compression = compression;
endBuffer = FloatBuffer.allocate(sizePer);
endBuffer.mark();
@ -110,6 +114,7 @@ public class CompressedFloatsSupplierSerializer
out.write(CompressedFloatsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer));
out.write(new byte[]{compression.getId()});
ByteStreams.copy(flattener.combineStreams(), out);
}
}

View File

@ -28,14 +28,14 @@ import java.nio.LongBuffer;
/**
*/
public class CompressedLongBufferObjectStrategy extends CompressedObjectStrategy<LongBuffer>
public class CompressedLongBufferObjectStrategy extends FixedSizeCompressedObjectStrategy<LongBuffer>
{
public static CompressedLongBufferObjectStrategy getBufferForOrder(ByteOrder order)
public static CompressedLongBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{
return new CompressedLongBufferObjectStrategy(order);
return new CompressedLongBufferObjectStrategy(order, compression, sizePer);
}
private CompressedLongBufferObjectStrategy(final ByteOrder order)
private CompressedLongBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
{
super(
order,
@ -64,8 +64,9 @@ public class CompressedLongBufferObjectStrategy extends CompressedObjectStrategy
{
return into.asLongBuffer().put(from);
}
}
},
compression,
sizePer
);
}
}

View File

@ -40,21 +40,25 @@ import java.util.Iterator;
*/
public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
{
public static final byte version = 0x1;
public static final byte LZF_VERSION = 0x1;
public static final byte version = 0x2;
private final int totalSize;
private final int sizePer;
private final GenericIndexed<ResourceHolder<LongBuffer>> baseLongBuffers;
private final CompressedObjectStrategy.CompressionStrategy compression;
CompressedLongsIndexedSupplier(
int totalSize,
int sizePer,
GenericIndexed<ResourceHolder<LongBuffer>> baseLongBuffers
GenericIndexed<ResourceHolder<LongBuffer>> baseLongBuffers,
CompressedObjectStrategy.CompressionStrategy compression
)
{
this.totalSize = totalSize;
this.sizePer = sizePer;
this.baseLongBuffers = baseLongBuffers;
this.compression = compression;
}
public int size()
@ -162,7 +166,7 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
public long getSerializedSize()
{
return baseLongBuffers.getSerializedSize() + 1 + 4 + 4;
return baseLongBuffers.getSerializedSize() + 1 + 4 + 4 + 1;
}
public void writeToChannel(WritableByteChannel channel) throws IOException
@ -170,15 +174,17 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
channel.write(ByteBuffer.wrap(new byte[]{version}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer)));
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
baseLongBuffers.writeToChannel(channel);
}
public CompressedLongsIndexedSupplier convertByteOrder(ByteOrder order)
public CompressedLongsIndexedSupplier convertByteOrder(ByteOrder order, CompressedObjectStrategy.CompressionStrategy compression)
{
return new CompressedLongsIndexedSupplier(
totalSize,
sizePer,
GenericIndexed.fromIterable(baseLongBuffers, CompressedLongBufferObjectStrategy.getBufferForOrder(order))
GenericIndexed.fromIterable(baseLongBuffers, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
compression
);
}
@ -196,23 +202,37 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == version) {
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get());
return new CompressedLongsIndexedSupplier(
buffer.getInt(),
buffer.getInt(),
GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order))
totalSize,
sizePer,
GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
compression
);
} else if (versionFromBuffer == LZF_VERSION) {
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.LZF;
return new CompressedLongsIndexedSupplier(
totalSize,
sizePer,
GenericIndexed.read(buffer, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
compression
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
public static CompressedLongsIndexedSupplier fromLongBuffer(LongBuffer buffer, final ByteOrder byteOrder)
public static CompressedLongsIndexedSupplier fromLongBuffer(LongBuffer buffer, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression)
{
return fromLongBuffer(buffer, 0xFFFF / Longs.BYTES, byteOrder);
return fromLongBuffer(buffer, 0xFFFF / Longs.BYTES, byteOrder, compression);
}
public static CompressedLongsIndexedSupplier fromLongBuffer(
final LongBuffer buffer, final int chunkFactor, final ByteOrder byteOrder
final LongBuffer buffer, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression
)
{
Preconditions.checkArgument(
@ -259,8 +279,9 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
};
}
},
CompressedLongBufferObjectStrategy.getBufferForOrder(byteOrder)
)
CompressedLongBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor)
),
compression
);
}

View File

@ -37,20 +37,23 @@ import java.nio.LongBuffer;
public class CompressedLongsSupplierSerializer
{
public static CompressedLongsSupplierSerializer create(
IOPeon ioPeon, final String filenameBase, final ByteOrder order
IOPeon ioPeon, final String filenameBase, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression
) throws IOException
{
final int sizePer = 0xFFFF / Longs.BYTES;
final CompressedLongsSupplierSerializer retVal = new CompressedLongsSupplierSerializer(
0xFFFF / Longs.BYTES,
sizePer,
new GenericIndexedWriter<ResourceHolder<LongBuffer>>(
ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order)
)
ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)
),
compression
);
return retVal;
}
private final int sizePer;
private final GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener;
private final CompressedObjectStrategy.CompressionStrategy compression;
private int numInserted = 0;
@ -58,11 +61,13 @@ public class CompressedLongsSupplierSerializer
public CompressedLongsSupplierSerializer(
int sizePer,
GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener
GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener,
CompressedObjectStrategy.CompressionStrategy compression
)
{
this.sizePer = sizePer;
this.flattener = flattener;
this.compression = compression;
endBuffer = LongBuffer.allocate(sizePer);
endBuffer.mark();
@ -104,6 +109,7 @@ public class CompressedLongsSupplierSerializer
out.write(CompressedLongsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer));
out.write(new byte[]{compression.getId()});
ByteStreams.copy(flattener.combineStreams(), out);
}
}

View File

@ -21,31 +21,218 @@ package io.druid.segment.data;
import com.google.common.base.Throwables;
import com.metamx.common.guava.CloseQuietly;
import com.google.common.collect.Maps;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.LZFDecoder;
import io.druid.collections.ResourceHolder;
import io.druid.segment.CompressedPools;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.lz4.LZ4UnknownSizeDecompressor;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
/**
*/
public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrategy<ResourceHolder<T>>
{
private final ByteOrder order;
private final BufferConverter<T> converter;
public static final CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = CompressionStrategy.LZ4;
public static enum CompressionStrategy {
LZF ((byte)0x0)
{
@Override
public Decompressor getDecompressor()
{
return new LZFDecompressor();
}
@Override
public Compressor getCompressor()
{
return new LZFCompressor();
}
},
LZ4 ((byte)0x1) {
@Override
public Decompressor getDecompressor()
{
return new LZ4Decompressor();
}
@Override
public Compressor getCompressor()
{
return new LZ4Compressor();
}
};
final byte id;
CompressionStrategy(byte id) {
this.id = id;
}
public byte getId()
{
return id;
}
public abstract Compressor getCompressor();
public abstract Decompressor getDecompressor();
static final Map<Byte, CompressionStrategy> idMap = Maps.newHashMap();
static {
for(CompressionStrategy strategy : CompressionStrategy.values()) idMap.put(strategy.getId(), strategy);
}
public static CompressionStrategy forId(byte id)
{
return idMap.get(id);
}
}
public static interface Decompressor
{
/**
* Implementations of this method are expected to call out.flip() after writing to the output buffer
*
* @param in
* @param numBytes
* @param out
*/
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out);
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize);
}
public static interface Compressor
{
/**
* Currently assumes buf is an array backed ByteBuffer
*
* @param bytes
* @return
*/
public byte[] compress(byte[] bytes);
}
public static class LZFDecompressor implements Decompressor
{
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
final byte[] bytes = new byte[numBytes];
in.get(bytes);
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
final int numDecompressedBytes = LZFDecoder.decode(bytes, outputBytes);
out.put(outputBytes, 0, numDecompressedBytes);
out.flip();
}
catch (IOException e) {
Throwables.propagate(e);
}
}
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize)
{
decompress(in, numBytes, out);
}
}
public static class LZFCompressor implements Compressor
{
@Override
public byte[] compress(byte[] bytes)
{
final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder();
LZFChunk chunk = encoder.get().encodeChunk(bytes, 0, bytes.length);
CloseQuietly.close(encoder);
return chunk.getData();
}
}
public static class LZ4Decompressor implements Decompressor
{
private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor();
private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
final byte[] bytes = new byte[numBytes];
in.get(bytes);
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length);
out.put(outputBytes, 0, numDecompressedBytes);
out.flip();
}
catch (IOException e) {
Throwables.propagate(e);
}
}
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize)
{
final byte[] bytes = new byte[numBytes];
in.get(bytes);
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize);
out.put(outputBytes, 0, decompressedSize);
out.flip();
}
catch (IOException e) {
Throwables.propagate(e);
}
}
}
public static class LZ4Compressor implements Compressor
{
private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor();
@Override
public byte[] compress(byte[] bytes)
{
final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)];
final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length);
final byte[] out = new byte[outputBytes];
System.arraycopy(intermediate, 0, out, 0, outputBytes);
return out;
}
}
protected final ByteOrder order;
protected final BufferConverter<T> converter;
protected final Decompressor decompressor;
private final Compressor compressor;
protected CompressedObjectStrategy(
final ByteOrder order,
final BufferConverter<T> converter
final BufferConverter<T> converter,
final CompressionStrategy compression
)
{
this.order = order;
this.converter = converter;
this.decompressor = compression.getDecompressor();
this.compressor = compression.getCompressor();
}
@Override
@ -58,24 +245,12 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
@Override
public ResourceHolder<T> fromByteBuffer(ByteBuffer buffer, int numBytes)
{
byte[] bytes = new byte[numBytes];
buffer.get(bytes);
final ResourceHolder<ByteBuffer> bufHolder = CompressedPools.getByteBuf(order);
final ByteBuffer buf = bufHolder.get();
buf.position(0);
buf.limit(buf.capacity());
try {
final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes();
byte[] outputBytes = outputBytesHolder.get();
int numDecompressedBytes = LZFDecoder.decode(bytes, outputBytes);
buf.put(outputBytes, 0, numDecompressedBytes);
buf.flip();
CloseQuietly.close(outputBytesHolder);
decompress(buffer, numBytes, buf);
return new ResourceHolder<T>()
{
@Override
@ -91,23 +266,28 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
}
};
}
catch (IOException e) {
throw Throwables.propagate(e);
}
protected void decompress(
ByteBuffer buffer,
int numBytes,
ByteBuffer buf
)
{
decompressor.decompress(buffer, numBytes, buf);
}
@Override
public byte[] toBytes(ResourceHolder<T> holder)
{
T val = holder.get();
ByteBuffer buf = ByteBuffer.allocate(converter.sizeOf(val.remaining())).order(order);
ByteBuffer buf = bufferFor(val);
converter.combine(buf, val);
return compressor.compress(buf.array());
}
final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder();
LZFChunk chunk = encoder.get().encodeChunk(buf.array(), 0, buf.array().length);
CloseQuietly.close(encoder);
return chunk.getData();
protected ByteBuffer bufferFor(T val)
{
return ByteBuffer.allocate(converter.sizeOf(val.remaining())).order(order);
}
@Override

View File

@ -0,0 +1,56 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.data;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public abstract class FixedSizeCompressedObjectStrategy<T extends Buffer> extends CompressedObjectStrategy<T>
{
private final int sizePer;
protected FixedSizeCompressedObjectStrategy(
ByteOrder order,
BufferConverter<T> converter,
CompressionStrategy compression,
int sizePer
)
{
super(order, converter, compression);
this.sizePer = sizePer;
}
public int getSize() {
return sizePer;
}
@Override
protected ByteBuffer bufferFor(T val)
{
return ByteBuffer.allocate(converter.sizeOf(getSize())).order(order);
}
@Override
protected void decompress(ByteBuffer buffer, int numBytes, ByteBuffer buf)
{
decompressor.decompress(buffer, numBytes, buf, converter.sizeOf(getSize()));
}
}

View File

@ -37,6 +37,7 @@ import java.util.List;
*/
public class InMemoryCompressedFloats implements IndexedFloats
{
public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY;
private final CompressedFloatBufferObjectStrategy strategy;
private final int sizePer;
@ -56,7 +57,11 @@ public class InMemoryCompressedFloats implements IndexedFloats
)
{
this.sizePer = sizePer;
strategy = CompressedFloatBufferObjectStrategy.getBufferForOrder(order);
strategy = CompressedFloatBufferObjectStrategy.getBufferForOrder(
order,
COMPRESSION,
sizePer
);
endBuffer = FloatBuffer.allocate(sizePer);
endBuffer.mark();
@ -184,7 +189,8 @@ public class InMemoryCompressedFloats implements IndexedFloats
Arrays.<ResourceHolder<FloatBuffer>>asList(StupidResourceHolder.create(endBufCopy))
),
strategy
)
),
COMPRESSION
);
}

View File

@ -38,6 +38,7 @@ import java.util.List;
*/
public class InMemoryCompressedLongs implements IndexedLongs
{
public static final CompressedObjectStrategy.CompressionStrategy COMPRESSION = CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY;
private final CompressedLongBufferObjectStrategy strategy;
private final int sizePer;
@ -57,7 +58,11 @@ public class InMemoryCompressedLongs implements IndexedLongs
)
{
this.sizePer = sizePer;
strategy = CompressedLongBufferObjectStrategy.getBufferForOrder(order);
strategy = CompressedLongBufferObjectStrategy.getBufferForOrder(
order,
COMPRESSION,
sizePer
);
endBuffer = LongBuffer.allocate(sizePer);
endBuffer.mark();
@ -195,7 +200,8 @@ public class InMemoryCompressedLongs implements IndexedLongs
Arrays.<ResourceHolder<LongBuffer>>asList(StupidResourceHolder.create(longBufCopy))
),
strategy
)
),
COMPRESSION
);
}

View File

@ -25,6 +25,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -36,10 +38,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class CompressedFloatsIndexedSupplierTest
@RunWith(Parameterized.class)
public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest
{
public CompressedFloatsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
{
super(compressionStrategy);
}
private IndexedFloats indexed;
private CompressedFloatsIndexedSupplier supplier;
private float[] vals;
@ -68,7 +74,8 @@ public class CompressedFloatsIndexedSupplierTest
supplier = CompressedFloatsIndexedSupplier.fromFloatBuffer(
FloatBuffer.wrap(vals),
5,
ByteOrder.nativeOrder()
ByteOrder.nativeOrder(),
compressionStrategy
);
indexed = supplier.get();
@ -82,7 +89,7 @@ public class CompressedFloatsIndexedSupplierTest
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer(
FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder()
FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), compressionStrategy
);
theSupplier.writeToChannel(Channels.newChannel(baos));

View File

@ -23,6 +23,8 @@ import com.google.common.io.OutputSupplier;
import io.druid.collections.ResourceHolder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -31,21 +33,31 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.FloatBuffer;
/**
*/
public class CompressedFloatsSupplierSerializerTest
@RunWith(Parameterized.class)
public class CompressedFloatsSupplierSerializerTest extends CompressionStrategyTest
{
public CompressedFloatsSupplierSerializerTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
{
super(compressionStrategy);
}
@Test
public void testSanity() throws Exception
{
final ByteOrder order = ByteOrder.nativeOrder();
final int sizePer = 999;
CompressedFloatsSupplierSerializer serializer = new CompressedFloatsSupplierSerializer(
999,
sizePer,
new GenericIndexedWriter<ResourceHolder<FloatBuffer>>(
new IOPeonForTesting(),
"test",
CompressedFloatBufferObjectStrategy.getBufferForOrder(order)
CompressedFloatBufferObjectStrategy.getBufferForOrder(
order,
compressionStrategy,
sizePer
)
),
compressionStrategy
);
serializer.open();

View File

@ -38,8 +38,13 @@ import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class CompressedLongsIndexedSupplierTest
public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
{
public CompressedLongsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
{
super(compressionStrategy);
}
private IndexedLongs indexed;
private CompressedLongsIndexedSupplier supplier;
private long[] vals;
@ -66,7 +71,8 @@ public class CompressedLongsIndexedSupplierTest
supplier = CompressedLongsIndexedSupplier.fromLongBuffer(
LongBuffer.wrap(vals),
5,
ByteOrder.nativeOrder()
ByteOrder.nativeOrder(),
compressionStrategy
);
indexed = supplier.get();
@ -78,7 +84,7 @@ public class CompressedLongsIndexedSupplierTest
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final CompressedLongsIndexedSupplier theSupplier = CompressedLongsIndexedSupplier.fromLongBuffer(
LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder()
LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), compressionStrategy
);
theSupplier.writeToChannel(Channels.newChannel(baos));

View File

@ -23,6 +23,8 @@ import com.google.common.io.OutputSupplier;
import io.druid.collections.ResourceHolder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -31,21 +33,27 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
/**
*/
public class CompressedLongsSupplierSerializerTest
@RunWith(Parameterized.class)
public class CompressedLongsSupplierSerializerTest extends CompressionStrategyTest
{
public CompressedLongsSupplierSerializerTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
{
super(compressionStrategy);
}
@Test
public void testSanity() throws Exception
{
final ByteOrder order = ByteOrder.nativeOrder();
final int sizePer = 999;
CompressedLongsSupplierSerializer serializer = new CompressedLongsSupplierSerializer(
999,
sizePer,
new GenericIndexedWriter<ResourceHolder<LongBuffer>>(
new IOPeonForTesting(),
"test",
CompressedLongBufferObjectStrategy.getBufferForOrder(order)
)
CompressedLongBufferObjectStrategy.getBufferForOrder(order, compressionStrategy, sizePer)
),
compressionStrategy
);
serializer.open();

View File

@ -0,0 +1,52 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.junit.runners.Parameterized;
import java.util.Arrays;
public class CompressionStrategyTest
{
@Parameterized.Parameters
public static Iterable<Object[]> compressionStrategies()
{
return Iterables.transform(
Arrays.asList(CompressedObjectStrategy.CompressionStrategy.values()),
new Function<CompressedObjectStrategy.CompressionStrategy, Object[]>()
{
@Override
public Object[] apply(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
{
return new Object[]{compressionStrategy};
}
}
);
}
protected final CompressedObjectStrategy.CompressionStrategy compressionStrategy;
public CompressionStrategyTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
{
this.compressionStrategy = compressionStrategy;
}
}