Merge pull request #1354 from metamx/multi-valued-dimension-compression

Enabling compression for multiValued dimension
This commit is contained in:
Xavier Léauté 2015-05-26 23:43:53 -07:00
commit 1a3f04f0ed
9 changed files with 797 additions and 58 deletions

154
benchmarks/pom.xml Normal file
View File

@ -0,0 +1,154 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>druid-benchmarks</artifactId>
<name>druid-benchmarks</name>
<packaging>jar</packaging>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.8.0-SNAPSHOT</version>
</parent>
<prerequisites>
<maven>3.0</maven>
</prerequisites>
<dependencies>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jmh.version>1.9.2</jmh.version>
<javac.target>1.6</javac.target>
<uberjar.name>benchmarks</uberjar.name>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<compilerVersion>${javac.target}</compilerVersion>
<source>${javac.target}</source>
<target>${javac.target}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${uberjar.name}</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<!--
Shading signed JARs will fail without this.
http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
-->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>2.5</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.1</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.6</version>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.3</version>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@ -0,0 +1,189 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.segment.CompressedVSizeIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.data.WritableSupplier;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
public class CompressedVSizeIndexedBenchmark
{
private IndexedMultivalue<IndexedInts> uncompressed;
private IndexedMultivalue<IndexedInts> compressed;
@Param({"1", "2", "3", "4"})
int bytes;
@Param({"5", "10"})
int valuesPerRowBound;
@Setup
public void setup() throws IOException
{
Random rand = new Random(0);
List<int[]> rows = Lists.newArrayList();
final int bound = 1 << bytes;
for (int i = 0; i < 0x10000; i++) {
int[] row = new int[rand.nextInt(valuesPerRowBound)];
int count = rand.nextInt(valuesPerRowBound);
for (int j = 0; j < row.length; j++) {
row[j] = rand.nextInt(bound);
}
rows.add(row);
}
final ByteBuffer bufferCompressed = serialize(
CompressedVSizeIndexedSupplier.fromIterable(
Iterables.transform(
rows,
new Function<int[], IndexedInts>()
{
@Override
public IndexedInts apply(int[] input)
{
return VSizeIndexedInts.fromArray(input, 20);
}
}
),
bound - 1,
ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4
)
);
this.compressed = CompressedVSizeIndexedSupplier.fromByteBuffer(
bufferCompressed, ByteOrder.nativeOrder()
).get();
final ByteBuffer bufferUncompressed = serialize(
VSizeIndexed.fromIterable(
Iterables.transform(
rows,
new Function<int[], VSizeIndexedInts>()
{
@Override
public VSizeIndexedInts apply(int[] input)
{
return VSizeIndexedInts.fromArray(input, 20);
}
}
)
).asWritableSupplier()
);
this.uncompressed = VSizeIndexed.readFromByteBuffer(bufferUncompressed);
}
private static ByteBuffer serialize(WritableSupplier<IndexedMultivalue<IndexedInts>> writableSupplier)
throws IOException
{
final ByteBuffer buffer = ByteBuffer.allocateDirect((int) writableSupplier.getSerializedSize());
WritableByteChannel channel = new WritableByteChannel()
{
@Override
public int write(ByteBuffer src) throws IOException
{
int size = src.remaining();
buffer.put(src);
return size;
}
@Override
public boolean isOpen()
{
return true;
}
@Override
public void close() throws IOException
{
}
};
writableSupplier.writeToChannel(channel);
buffer.rewind();
return buffer;
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void uncompressed(Blackhole blackhole)
{
final int size = uncompressed.size();
for (int i = 0; i < size; ++i) {
IndexedInts row = uncompressed.get(i);
for (int j = 0; j < row.size(); j++) {
blackhole.consume(row.get(j));
}
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void compressed(Blackhole blackhole)
{
final int size = compressed.size();
for (int i = 0; i < size; ++i) {
IndexedInts row = compressed.get(i);
for (int j = 0; j < row.size(); j++) {
blackhole.consume(row.get(j));
}
}
}
public static void main(String... args) throws IOException, RunnerException
{
Options opt = new OptionsBuilder()
.include(CompressedVSizeIndexedBenchmark.class.getSimpleName())
.forks(1)
.build();
new Runner(opt).run();
}
}

View File

@ -83,6 +83,7 @@
<module>server</module> <module>server</module>
<module>services</module> <module>services</module>
<module>integration-tests</module> <module>integration-tests</module>
<module>benchmarks</module>
<module>aws-common</module> <module>aws-common</module>
<!-- Non-default modules --> <!-- Non-default modules -->
<module>extensions/cassandra-storage</module> <module>extensions/cassandra-storage</module>

View File

@ -0,0 +1,230 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.metamx.common.IAE;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIntsIterator;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.WritableSupplier;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Format -
* byte 1 - version
* offsets - indexed integers of length num of rows + 1 representing offsets of starting index of first element of each row in values index and last element equal to length of values column,
* the last element in the offsets represents the total length of values column.
* values - indexed integer representing values in each row
*/
public class CompressedVSizeIndexedSupplier implements WritableSupplier<IndexedMultivalue<IndexedInts>>
{
private static final byte version = 0x2;
//offsets - indexed integers of length num of rows + 1 representing offsets of starting index of first element of each row in values index
// last element represents the length of values column
private final CompressedVSizeIntsIndexedSupplier offsetSupplier;
//values - indexed integers representing actual values in each row
private final CompressedVSizeIntsIndexedSupplier valueSupplier;
CompressedVSizeIndexedSupplier(
CompressedVSizeIntsIndexedSupplier offsetSupplier,
CompressedVSizeIntsIndexedSupplier valueSupplier
)
{
this.offsetSupplier = offsetSupplier;
this.valueSupplier = valueSupplier;
}
public long getSerializedSize()
{
return 1 + offsetSupplier.getSerializedSize() + valueSupplier.getSerializedSize();
}
public void writeToChannel(WritableByteChannel channel) throws IOException
{
channel.write(ByteBuffer.wrap(new byte[]{version}));
offsetSupplier.writeToChannel(channel);
valueSupplier.writeToChannel(channel);
}
public static CompressedVSizeIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
{
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == version) {
CompressedVSizeIntsIndexedSupplier offsetSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
buffer,
order
);
CompressedVSizeIntsIndexedSupplier valueSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
buffer,
order
);
return new CompressedVSizeIndexedSupplier(offsetSupplier, valueSupplier);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
public static CompressedVSizeIndexedSupplier fromIterable(
Iterable<IndexedInts> objectsIterable,
int maxValue,
final ByteOrder byteOrder,
CompressedObjectStrategy.CompressionStrategy compression
)
{
Iterator<IndexedInts> objects = objectsIterable.iterator();
List<Integer> offsetList = new ArrayList<>();
List<Integer> values = new ArrayList<>();
int offset = 0;
while (objects.hasNext()) {
IndexedInts next = objects.next();
offsetList.add(offset);
for (int i = 0; i < next.size(); i++) {
values.add(next.get(i));
}
offset += next.size();
}
offsetList.add(offset);
int offsetMax = offset;
CompressedVSizeIntsIndexedSupplier headerSupplier = CompressedVSizeIntsIndexedSupplier.fromList(
offsetList,
offsetMax,
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(offsetMax),
byteOrder,
compression
);
CompressedVSizeIntsIndexedSupplier valuesSupplier = CompressedVSizeIntsIndexedSupplier.fromList(
values,
maxValue,
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue),
byteOrder,
compression
);
return new CompressedVSizeIndexedSupplier(headerSupplier, valuesSupplier);
}
@Override
public IndexedMultivalue<IndexedInts> get()
{
return new CompressedVSizeIndexed(offsetSupplier.get(), valueSupplier.get());
}
public static class CompressedVSizeIndexed implements IndexedMultivalue<IndexedInts>
{
private final IndexedInts offsets;
private final IndexedInts values;
CompressedVSizeIndexed(IndexedInts offsets, IndexedInts values)
{
this.offsets = offsets;
this.values = values;
}
@Override
public void close() throws IOException
{
offsets.close();
values.close();
}
@Override
public Class<? extends IndexedInts> getClazz()
{
return IndexedInts.class;
}
@Override
public int size()
{
return offsets.size() - 1;
}
@Override
public IndexedInts get(int index)
{
final int offset = offsets.get(index);
final int size = offsets.get(index + 1) - offset;
return new IndexedInts()
{
@Override
public int size()
{
return size;
}
@Override
public int get(int index)
{
if (index >= size) {
throw new IllegalArgumentException(String.format("Index[%s] >= size[%s]", index, size));
}
return values.get(index + offset);
}
@Override
public void fill(int index, int[] toFill)
{
throw new UnsupportedOperationException("fill not supported");
}
@Override
public void close() throws IOException
{
// no-op
}
@Override
public Iterator<Integer> iterator()
{
return new IndexedIntsIterator(this);
}
};
}
@Override
public int indexOf(IndexedInts value)
{
throw new UnsupportedOperationException("Reverse lookup not allowed.");
}
@Override
public Iterator<IndexedInts> iterator()
{
return IndexedIterable.create(this).iterator();
}
}
}

View File

@ -718,13 +718,16 @@ public class IndexIO
} else { } else {
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
} }
} else if (compressionStrategy != null) {
columnPartBuilder.withMultiValuedColumn(
CompressedVSizeIndexedSupplier.fromIterable(
multiValCol,
dictionary.size(),
BYTE_ORDER,
compressionStrategy
)
);
} else { } else {
if (compressionStrategy != null) {
log.info(
"Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]",
dimension
);
}
columnPartBuilder.withMultiValuedColumn(multiValCol); columnPartBuilder.withMultiValuedColumn(multiValCol);
} }

View File

@ -1188,16 +1188,20 @@ public class IndexMaker
} else { } else {
dimPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); dimPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
} }
} else if (compressionStrategy != null) {
dimPartBuilder.withMultiValuedColumn(
CompressedVSizeIndexedSupplier.fromIterable(
multiValCol,
dictionary.size(),
IndexIO.BYTE_ORDER,
compressionStrategy
)
);
} else { } else {
if (compressionStrategy != null) {
log.info(
"Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]",
dimension
);
}
dimPartBuilder.withMultiValuedColumn(multiValCol); dimPartBuilder.withMultiValuedColumn(multiValCol);
} }
writeColumn( writeColumn(
v9Smoosher, v9Smoosher,
dimPartBuilder.build(), dimPartBuilder.build(),

View File

@ -69,7 +69,7 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
this.compression = compression; this.compression = compression;
this.numBytes = numBytes; this.numBytes = numBytes;
this.bigEndianShift = Integer.SIZE - (numBytes << 3); // numBytes * 8 this.bigEndianShift = Integer.SIZE - (numBytes << 3); // numBytes * 8
this.littleEndianMask = (int)((1L << (numBytes << 3)) - 1); // set numBytes * 8 lower bits to 1 this.littleEndianMask = (int) ((1L << (numBytes << 3)) - 1); // set numBytes * 8 lower bits to 1
} }
public static int maxIntsInBufferForBytes(int numBytes) public static int maxIntsInBufferForBytes(int numBytes)
@ -107,7 +107,7 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
public IndexedInts get() public IndexedInts get()
{ {
// optimized versions for int, short, and byte columns // optimized versions for int, short, and byte columns
if(numBytes == Ints.BYTES) { if (numBytes == Ints.BYTES) {
return new CompressedFullSizeIndexedInts(); return new CompressedFullSizeIndexedInts();
} else if (numBytes == Shorts.BYTES) { } else if (numBytes == Shorts.BYTES) {
return new CompressedShortSizeIndexedInts(); return new CompressedShortSizeIndexedInts();
@ -157,7 +157,9 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
final int sizePer = buffer.getInt(); final int sizePer = buffer.getInt();
final int chunkBytes = sizePer * numBytes + bufferPadding(numBytes); final int chunkBytes = sizePer * numBytes + bufferPadding(numBytes);
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get()); final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(
buffer.get()
);
return new CompressedVSizeIntsIndexedSupplier( return new CompressedVSizeIntsIndexedSupplier(
totalSize, totalSize,
@ -176,7 +178,11 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
} }
public static CompressedVSizeIntsIndexedSupplier fromList( public static CompressedVSizeIntsIndexedSupplier fromList(
final List<Integer> list, final int maxValue, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression final List<Integer> list,
final int maxValue,
final int chunkFactor,
final ByteOrder byteOrder,
CompressedObjectStrategy.CompressionStrategy compression
) )
{ {
final int numBytes = VSizeIndexedInts.getNumBytesForMax(maxValue); final int numBytes = VSizeIndexedInts.getNumBytesForMax(maxValue);
@ -254,7 +260,8 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
); );
} }
private class CompressedFullSizeIndexedInts extends CompressedVSizeIndexedInts { private class CompressedFullSizeIndexedInts extends CompressedVSizeIndexedInts
{
IntBuffer intBuffer; IntBuffer intBuffer;
@Override @Override
@ -271,7 +278,8 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
} }
} }
private class CompressedShortSizeIndexedInts extends CompressedVSizeIndexedInts { private class CompressedShortSizeIndexedInts extends CompressedVSizeIndexedInts
{
ShortBuffer shortBuffer; ShortBuffer shortBuffer;
@Override @Override
@ -289,7 +297,8 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
} }
} }
private class CompressedByteSizeIndexedInts extends CompressedVSizeIndexedInts { private class CompressedByteSizeIndexedInts extends CompressedVSizeIndexedInts
{
@Override @Override
protected int _get(int index) protected int _get(int index)
{ {
@ -318,10 +327,11 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
/** /**
* Returns the value at the given index into the column. * Returns the value at the given index into the column.
* * <p/>
* Assumes the number of entries in each decompression buffers is a power of two. * Assumes the number of entries in each decompression buffers is a power of two.
* *
* @param index index of the value in the column * @param index index of the value in the column
*
* @return the value at the given index * @return the value at the given index
*/ */
@Override @Override
@ -341,6 +351,7 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
* Returns the value at the given index in the current decompression buffer * Returns the value at the given index in the current decompression buffer
* *
* @param index index of the value in the curent buffer * @param index index of the value in the curent buffer
*
* @return the value at the given index * @return the value at the given index
*/ */
protected int _get(final int index) protected int _get(final int index)

View File

@ -24,7 +24,7 @@ import com.google.common.primitives.Ints;
import com.metamx.collections.bitmap.ImmutableBitmap; import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.Pair; import io.druid.segment.CompressedVSizeIndexedSupplier;
import io.druid.segment.column.ColumnBuilder; import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.column.ColumnConfig; import io.druid.segment.column.ColumnConfig;
import io.druid.segment.column.ValueType; import io.druid.segment.column.ValueType;
@ -52,9 +52,12 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
{ {
private static final int NO_FLAGS = 0; private static final int NO_FLAGS = 0;
enum Feature { enum Feature
{
MULTI_VALUE; MULTI_VALUE;
public boolean isSet(int flags) { return (getMask() & flags) != 0; } public boolean isSet(int flags) { return (getMask() & flags) != 0; }
public int getMask() { return (1 << ordinal()); } public int getMask() { return (1 << ordinal()); }
} }
@ -64,18 +67,22 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
UNCOMPRESSED_MULTI_VALUE, // 0x1 UNCOMPRESSED_MULTI_VALUE, // 0x1
COMPRESSED; // 0x2 COMPRESSED; // 0x2
public static VERSION fromByte(byte b) {
public static VERSION fromByte(byte b)
{
final VERSION[] values = VERSION.values(); final VERSION[] values = VERSION.values();
Preconditions.checkArgument(b < values.length, "Unsupported dictionary column version[%s]", b); Preconditions.checkArgument(b < values.length, "Unsupported dictionary column version[%s]", b);
return values[b]; return values[b];
} }
public byte asByte() { public byte asByte()
return (byte)this.ordinal(); {
return (byte) this.ordinal();
} }
} }
public static class Builder { public static class Builder
{
private VERSION version = null; private VERSION version = null;
private int flags = NO_FLAGS; private int flags = NO_FLAGS;
private GenericIndexed<String> dictionary = null; private GenericIndexed<String> dictionary = null;
@ -145,6 +152,15 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
return this; return this;
} }
public Builder withMultiValuedColumn(CompressedVSizeIndexedSupplier multiValuedColumn)
{
Preconditions.checkState(singleValuedColumn == null, "Cannot set both singleValuedColumn and multiValuedColumn");
this.version = VERSION.COMPRESSED;
this.flags |= Feature.MULTI_VALUE.getMask();
this.multiValuedColumn = multiValuedColumn;
return this;
}
public DictionaryEncodedColumnPartSerde build() public DictionaryEncodedColumnPartSerde build()
{ {
Preconditions.checkArgument( Preconditions.checkArgument(
@ -165,6 +181,8 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
byteOrder byteOrder
); );
} }
} }
public static Builder builder() public static Builder builder()
@ -265,7 +283,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
public void write(WritableByteChannel channel) throws IOException public void write(WritableByteChannel channel) throws IOException
{ {
channel.write(ByteBuffer.wrap(new byte[]{version.asByte()})); channel.write(ByteBuffer.wrap(new byte[]{version.asByte()}));
if(version.compareTo(VERSION.COMPRESSED) >= 0) { if (version.compareTo(VERSION.COMPRESSED) >= 0) {
channel.write(ByteBuffer.wrap(Ints.toByteArray(flags))); channel.write(ByteBuffer.wrap(Ints.toByteArray(flags)));
} }
@ -304,18 +322,15 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
final VERSION rVersion = VERSION.fromByte(buffer.get()); final VERSION rVersion = VERSION.fromByte(buffer.get());
final int rFlags; final int rFlags;
if(rVersion.compareTo(VERSION.COMPRESSED) >= 0 ) { if (rVersion.compareTo(VERSION.COMPRESSED) >= 0) {
rFlags = buffer.getInt(); rFlags = buffer.getInt();
} else { } else {
rFlags = rVersion.equals(VERSION.UNCOMPRESSED_MULTI_VALUE) ? rFlags = rVersion.equals(VERSION.UNCOMPRESSED_MULTI_VALUE) ?
Feature.MULTI_VALUE.getMask() : Feature.MULTI_VALUE.getMask() :
NO_FLAGS; NO_FLAGS;
} }
final boolean hasMultipleValues = Feature.MULTI_VALUE.isSet(rFlags); final boolean hasMultipleValues = Feature.MULTI_VALUE.isSet(rFlags);
if(rVersion.equals(VERSION.COMPRESSED) && hasMultipleValues) {
throw new IAE("Compressed dictionary encoded columns currently do not support multi-value columns");
}
final GenericIndexed<String> rDictionary = GenericIndexed.read(buffer, GenericIndexed.STRING_STRATEGY); final GenericIndexed<String> rDictionary = GenericIndexed.read(buffer, GenericIndexed.STRING_STRATEGY);
builder.setType(ValueType.STRING); builder.setType(ValueType.STRING);
@ -323,13 +338,12 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
final WritableSupplier<IndexedInts> rSingleValuedColumn; final WritableSupplier<IndexedInts> rSingleValuedColumn;
final WritableSupplier<IndexedMultivalue<IndexedInts>> rMultiValuedColumn; final WritableSupplier<IndexedMultivalue<IndexedInts>> rMultiValuedColumn;
if (rVersion.compareTo(VERSION.COMPRESSED) >= 0) { if (hasMultipleValues) {
rSingleValuedColumn = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(buffer, byteOrder); rMultiValuedColumn = readMultiValuedColum(rVersion, buffer);
rMultiValuedColumn = null; rSingleValuedColumn = null;
} else { } else {
Pair<WritableSupplier<IndexedInts>, VSizeIndexed> cols = readUncompressed(rVersion, buffer); rSingleValuedColumn = readSingleValuedColumn(rVersion, buffer);
rSingleValuedColumn = cols.lhs; rMultiValuedColumn = null;
rMultiValuedColumn = cols.rhs == null ? null : cols.rhs.asWritableSupplier();
} }
builder.setHasMultipleValues(hasMultipleValues) builder.setHasMultipleValues(hasMultipleValues)
@ -374,30 +388,26 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
); );
} }
private static Pair<WritableSupplier<IndexedInts>, VSizeIndexed> readUncompressed( private WritableSupplier<IndexedInts> readSingleValuedColumn(VERSION version, ByteBuffer buffer)
VERSION version,
ByteBuffer buffer
)
{ {
final WritableSupplier<IndexedInts> singleValuedColumn;
final VSizeIndexed multiValuedColumn;
switch (version) { switch (version) {
case UNCOMPRESSED_SINGLE_VALUE: case UNCOMPRESSED_SINGLE_VALUE:
singleValuedColumn = VSizeIndexedInts.readFromByteBuffer(buffer).asWritableSupplier(); return VSizeIndexedInts.readFromByteBuffer(buffer).asWritableSupplier();
multiValuedColumn = null; case COMPRESSED:
break; return CompressedVSizeIntsIndexedSupplier.fromByteBuffer(buffer, byteOrder);
case UNCOMPRESSED_MULTI_VALUE:
singleValuedColumn = null;
multiValuedColumn = VSizeIndexed.readFromByteBuffer(buffer);
break;
default:
throw new IAE("Unsupported version[%s]", version);
} }
throw new IAE("Unsupported single-value version[%s]", version);
}
return Pair.of(singleValuedColumn, multiValuedColumn); private WritableSupplier<IndexedMultivalue<IndexedInts>> readMultiValuedColum(VERSION version, ByteBuffer buffer)
{
switch (version) {
case UNCOMPRESSED_MULTI_VALUE:
return VSizeIndexed.readFromByteBuffer(buffer).asWritableSupplier();
case COMPRESSED:
return CompressedVSizeIndexedSupplier.fromByteBuffer(buffer, byteOrder);
}
throw new IAE("Unsupported multi-value version[%s]", version);
} }
@Override @Override

View File

@ -0,0 +1,137 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import io.druid.segment.CompressedVSizeIndexedSupplier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
*/
public class CompressedVSizeIndexedSupplierTest
{
private List<int[]> vals;
private CompressedVSizeIndexedSupplier indexedSupplier;
@Before
public void setUpSimple(){
vals = Arrays.asList(
new int[1],
new int[]{1, 2, 3, 4, 5},
new int[]{6, 7, 8, 9, 10},
new int[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
);
indexedSupplier = CompressedVSizeIndexedSupplier.fromIterable(
Iterables.transform(
vals,
new Function<int[], IndexedInts>()
{
@Override
public IndexedInts apply(int[] input)
{
return VSizeIndexedInts.fromArray(input, 20);
}
}
), 20, ByteOrder.nativeOrder(),
CompressedObjectStrategy.CompressionStrategy.LZ4
);
}
@After
public void teardown(){
indexedSupplier = null;
vals = null;
}
@Test
public void testSanity() throws Exception
{
assertSame(vals, indexedSupplier.get());
}
@Test
public void testSerde() throws IOException
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
indexedSupplier.writeToChannel(Channels.newChannel(baos));
final byte[] bytes = baos.toByteArray();
Assert.assertEquals(indexedSupplier.getSerializedSize(), bytes.length);
CompressedVSizeIndexedSupplier deserializedIndexed = CompressedVSizeIndexedSupplier.fromByteBuffer(
ByteBuffer.wrap(bytes),
ByteOrder.nativeOrder()
);
assertSame(vals, deserializedIndexed.get());
}
@Test(expected = IllegalArgumentException.class)
public void testGetInvalidElementInRow(){
indexedSupplier.get().get(3).get(15);
}
@Test
public void testIterators(){
Iterator<IndexedInts> iterator = indexedSupplier.get().iterator();
int row = 0;
while(iterator.hasNext()){
final int[] ints = vals.get(row);
final IndexedInts vSizeIndexedInts = iterator.next();
Assert.assertEquals(ints.length, vSizeIndexedInts.size());
Iterator<Integer> valsIterator = vSizeIndexedInts.iterator();
int j=0;
while(valsIterator.hasNext()){
Assert.assertEquals((Integer)ints[j], valsIterator.next());
j++;
}
row ++;
}
}
private void assertSame(List<int[]> someInts, IndexedMultivalue<IndexedInts> indexed)
{
Assert.assertEquals(someInts.size(), indexed.size());
for (int i = 0; i < indexed.size(); ++i) {
final int[] ints = someInts.get(i);
final IndexedInts vSizeIndexedInts = indexed.get(i);
Assert.assertEquals(ints.length, vSizeIndexedInts.size());
for (int j = 0; j < ints.length; j++) {
Assert.assertEquals(ints[j], vSizeIndexedInts.get(j));
}
}
}
}