diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
new file mode 100644
index 00000000000..74304b0f73b
--- /dev/null
+++ b/benchmarks/pom.xml
@@ -0,0 +1,154 @@
+
+
+
+
+ 4.0.0
+
+ druid-benchmarks
+ druid-benchmarks
+ jar
+
+ io.druid
+ druid
+ 0.8.0-SNAPSHOT
+
+
+
+ 3.0
+
+
+
+
+ org.openjdk.jmh
+ jmh-core
+ ${jmh.version}
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ ${jmh.version}
+ provided
+
+
+ io.druid
+ druid-processing
+ ${project.parent.version}
+
+
+
+
+ UTF-8
+ 1.9.2
+ 1.6
+ benchmarks
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+ ${javac.target}
+
+ ${javac.target}
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.2
+
+
+ package
+
+ shade
+
+
+ ${uberjar.name}
+
+
+ org.openjdk.jmh.Main
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
+
+ maven-clean-plugin
+ 2.5
+
+
+ maven-deploy-plugin
+ 2.8.1
+
+
+ maven-install-plugin
+ 2.5.1
+
+
+ maven-jar-plugin
+ 2.4
+
+
+ maven-javadoc-plugin
+ 2.9.1
+
+
+ maven-resources-plugin
+ 2.6
+
+
+ maven-site-plugin
+ 3.3
+
+
+ maven-source-plugin
+ 2.2.1
+
+
+ maven-surefire-plugin
+ 2.17
+
+
+
+
+
+
diff --git a/benchmarks/src/main/java/io/druid/benchmark/CompressedVSizeIndexedBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/CompressedVSizeIndexedBenchmark.java
new file mode 100644
index 00000000000..8e69a727966
--- /dev/null
+++ b/benchmarks/src/main/java/io/druid/benchmark/CompressedVSizeIndexedBenchmark.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.benchmark;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import io.druid.segment.CompressedVSizeIndexedSupplier;
+import io.druid.segment.data.CompressedObjectStrategy;
+import io.druid.segment.data.IndexedInts;
+import io.druid.segment.data.IndexedMultivalue;
+import io.druid.segment.data.VSizeIndexed;
+import io.druid.segment.data.VSizeIndexedInts;
+import io.druid.segment.data.WritableSupplier;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+public class CompressedVSizeIndexedBenchmark
+{
+ private IndexedMultivalue uncompressed;
+ private IndexedMultivalue compressed;
+
+ @Param({"1", "2", "3", "4"})
+ int bytes;
+
+ @Param({"5", "10"})
+ int valuesPerRowBound;
+
+ @Setup
+ public void setup() throws IOException
+ {
+ Random rand = new Random(0);
+ List rows = Lists.newArrayList();
+ final int bound = 1 << bytes;
+ for (int i = 0; i < 0x10000; i++) {
+ int[] row = new int[rand.nextInt(valuesPerRowBound)];
+ int count = rand.nextInt(valuesPerRowBound);
+ for (int j = 0; j < row.length; j++) {
+ row[j] = rand.nextInt(bound);
+ }
+ rows.add(row);
+ }
+
+ final ByteBuffer bufferCompressed = serialize(
+ CompressedVSizeIndexedSupplier.fromIterable(
+ Iterables.transform(
+ rows,
+ new Function()
+ {
+ @Override
+ public IndexedInts apply(int[] input)
+ {
+ return VSizeIndexedInts.fromArray(input, 20);
+ }
+ }
+ ),
+ bound - 1,
+ ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4
+ )
+ );
+ this.compressed = CompressedVSizeIndexedSupplier.fromByteBuffer(
+ bufferCompressed, ByteOrder.nativeOrder()
+ ).get();
+
+ final ByteBuffer bufferUncompressed = serialize(
+ VSizeIndexed.fromIterable(
+ Iterables.transform(
+ rows,
+ new Function()
+ {
+ @Override
+ public VSizeIndexedInts apply(int[] input)
+ {
+ return VSizeIndexedInts.fromArray(input, 20);
+ }
+ }
+ )
+ ).asWritableSupplier()
+ );
+ this.uncompressed = VSizeIndexed.readFromByteBuffer(bufferUncompressed);
+ }
+
+ private static ByteBuffer serialize(WritableSupplier> writableSupplier)
+ throws IOException
+ {
+ final ByteBuffer buffer = ByteBuffer.allocateDirect((int) writableSupplier.getSerializedSize());
+
+ WritableByteChannel channel = new WritableByteChannel()
+ {
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ int size = src.remaining();
+ buffer.put(src);
+ return size;
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ }
+ };
+
+ writableSupplier.writeToChannel(channel);
+ buffer.rewind();
+ return buffer;
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void uncompressed(Blackhole blackhole)
+ {
+ final int size = uncompressed.size();
+ for (int i = 0; i < size; ++i) {
+ IndexedInts row = uncompressed.get(i);
+ for (int j = 0; j < row.size(); j++) {
+ blackhole.consume(row.get(j));
+ }
+ }
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void compressed(Blackhole blackhole)
+ {
+ final int size = compressed.size();
+ for (int i = 0; i < size; ++i) {
+ IndexedInts row = compressed.get(i);
+ for (int j = 0; j < row.size(); j++) {
+ blackhole.consume(row.get(j));
+ }
+ }
+ }
+
+ public static void main(String... args) throws IOException, RunnerException
+ {
+ Options opt = new OptionsBuilder()
+ .include(CompressedVSizeIndexedBenchmark.class.getSimpleName())
+ .forks(1)
+ .build();
+
+ new Runner(opt).run();
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e9d5046826f..65e09189a98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
server
services
integration-tests
+ benchmarks
aws-common
extensions/cassandra-storage
diff --git a/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java
new file mode 100644
index 00000000000..0f4bc8d7c05
--- /dev/null
+++ b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment;
+
+import com.metamx.common.IAE;
+import io.druid.segment.data.CompressedObjectStrategy;
+import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
+import io.druid.segment.data.IndexedInts;
+import io.druid.segment.data.IndexedIntsIterator;
+import io.druid.segment.data.IndexedIterable;
+import io.druid.segment.data.IndexedMultivalue;
+import io.druid.segment.data.WritableSupplier;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Format -
+ * byte 1 - version
+ * offsets - indexed integers of length num of rows + 1 representing offsets of starting index of first element of each row in values index and last element equal to length of values column,
+ * the last element in the offsets represents the total length of values column.
+ * values - indexed integer representing values in each row
+ */
+
+public class CompressedVSizeIndexedSupplier implements WritableSupplier>
+{
+ private static final byte version = 0x2;
+ //offsets - indexed integers of length num of rows + 1 representing offsets of starting index of first element of each row in values index
+ // last element represents the length of values column
+ private final CompressedVSizeIntsIndexedSupplier offsetSupplier;
+
+ //values - indexed integers representing actual values in each row
+ private final CompressedVSizeIntsIndexedSupplier valueSupplier;
+
+ CompressedVSizeIndexedSupplier(
+ CompressedVSizeIntsIndexedSupplier offsetSupplier,
+ CompressedVSizeIntsIndexedSupplier valueSupplier
+ )
+ {
+ this.offsetSupplier = offsetSupplier;
+ this.valueSupplier = valueSupplier;
+ }
+
+ public long getSerializedSize()
+ {
+ return 1 + offsetSupplier.getSerializedSize() + valueSupplier.getSerializedSize();
+ }
+
+ public void writeToChannel(WritableByteChannel channel) throws IOException
+ {
+ channel.write(ByteBuffer.wrap(new byte[]{version}));
+ offsetSupplier.writeToChannel(channel);
+ valueSupplier.writeToChannel(channel);
+ }
+
+ public static CompressedVSizeIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
+ {
+ byte versionFromBuffer = buffer.get();
+
+ if (versionFromBuffer == version) {
+ CompressedVSizeIntsIndexedSupplier offsetSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
+ buffer,
+ order
+ );
+ CompressedVSizeIntsIndexedSupplier valueSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
+ buffer,
+ order
+ );
+ return new CompressedVSizeIndexedSupplier(offsetSupplier, valueSupplier);
+ }
+ throw new IAE("Unknown version[%s]", versionFromBuffer);
+ }
+
+ public static CompressedVSizeIndexedSupplier fromIterable(
+ Iterable objectsIterable,
+ int maxValue,
+ final ByteOrder byteOrder,
+ CompressedObjectStrategy.CompressionStrategy compression
+ )
+ {
+ Iterator objects = objectsIterable.iterator();
+ List offsetList = new ArrayList<>();
+ List values = new ArrayList<>();
+
+ int offset = 0;
+ while (objects.hasNext()) {
+ IndexedInts next = objects.next();
+ offsetList.add(offset);
+ for (int i = 0; i < next.size(); i++) {
+ values.add(next.get(i));
+ }
+ offset += next.size();
+ }
+ offsetList.add(offset);
+ int offsetMax = offset;
+ CompressedVSizeIntsIndexedSupplier headerSupplier = CompressedVSizeIntsIndexedSupplier.fromList(
+ offsetList,
+ offsetMax,
+ CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(offsetMax),
+ byteOrder,
+ compression
+ );
+ CompressedVSizeIntsIndexedSupplier valuesSupplier = CompressedVSizeIntsIndexedSupplier.fromList(
+ values,
+ maxValue,
+ CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue),
+ byteOrder,
+ compression
+ );
+ return new CompressedVSizeIndexedSupplier(headerSupplier, valuesSupplier);
+ }
+
+
+ @Override
+ public IndexedMultivalue get()
+ {
+ return new CompressedVSizeIndexed(offsetSupplier.get(), valueSupplier.get());
+ }
+
+ public static class CompressedVSizeIndexed implements IndexedMultivalue
+ {
+ private final IndexedInts offsets;
+ private final IndexedInts values;
+
+
+ CompressedVSizeIndexed(IndexedInts offsets, IndexedInts values)
+ {
+ this.offsets = offsets;
+ this.values = values;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ offsets.close();
+ values.close();
+ }
+
+ @Override
+ public Class extends IndexedInts> getClazz()
+ {
+ return IndexedInts.class;
+ }
+
+ @Override
+ public int size()
+ {
+ return offsets.size() - 1;
+ }
+
+ @Override
+ public IndexedInts get(int index)
+ {
+ final int offset = offsets.get(index);
+ final int size = offsets.get(index + 1) - offset;
+
+ return new IndexedInts()
+ {
+ @Override
+ public int size()
+ {
+ return size;
+ }
+
+ @Override
+ public int get(int index)
+ {
+ if (index >= size) {
+ throw new IllegalArgumentException(String.format("Index[%s] >= size[%s]", index, size));
+ }
+ return values.get(index + offset);
+ }
+
+ @Override
+ public void fill(int index, int[] toFill)
+ {
+ throw new UnsupportedOperationException("fill not supported");
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ // no-op
+ }
+
+ @Override
+ public Iterator iterator()
+ {
+ return new IndexedIntsIterator(this);
+ }
+ };
+ }
+
+ @Override
+ public int indexOf(IndexedInts value)
+ {
+ throw new UnsupportedOperationException("Reverse lookup not allowed.");
+ }
+
+ @Override
+ public Iterator iterator()
+ {
+ return IndexedIterable.create(this).iterator();
+ }
+
+ }
+
+}
diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java
index 0888abd8c87..8fde6d90508 100644
--- a/processing/src/main/java/io/druid/segment/IndexIO.java
+++ b/processing/src/main/java/io/druid/segment/IndexIO.java
@@ -718,13 +718,16 @@ public class IndexIO
} else {
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
}
+ } else if (compressionStrategy != null) {
+ columnPartBuilder.withMultiValuedColumn(
+ CompressedVSizeIndexedSupplier.fromIterable(
+ multiValCol,
+ dictionary.size(),
+ BYTE_ORDER,
+ compressionStrategy
+ )
+ );
} else {
- if (compressionStrategy != null) {
- log.info(
- "Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]",
- dimension
- );
- }
columnPartBuilder.withMultiValuedColumn(multiValCol);
}
diff --git a/processing/src/main/java/io/druid/segment/IndexMaker.java b/processing/src/main/java/io/druid/segment/IndexMaker.java
index 05b06786b41..06a525039c7 100644
--- a/processing/src/main/java/io/druid/segment/IndexMaker.java
+++ b/processing/src/main/java/io/druid/segment/IndexMaker.java
@@ -1188,16 +1188,20 @@ public class IndexMaker
} else {
dimPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
}
+ } else if (compressionStrategy != null) {
+ dimPartBuilder.withMultiValuedColumn(
+ CompressedVSizeIndexedSupplier.fromIterable(
+ multiValCol,
+ dictionary.size(),
+ IndexIO.BYTE_ORDER,
+ compressionStrategy
+ )
+ );
} else {
- if (compressionStrategy != null) {
- log.info(
- "Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]",
- dimension
- );
- }
dimPartBuilder.withMultiValuedColumn(multiValCol);
}
+
writeColumn(
v9Smoosher,
dimPartBuilder.build(),
diff --git a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java
index 0509e3a55ae..5bd60147cf7 100644
--- a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java
+++ b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java
@@ -69,7 +69,7 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier list, final int maxValue, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression
+ final List list,
+ final int maxValue,
+ final int chunkFactor,
+ final ByteOrder byteOrder,
+ CompressedObjectStrategy.CompressionStrategy compression
)
{
final int numBytes = VSizeIndexedInts.getNumBytesForMax(maxValue);
@@ -254,7 +260,8 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier
* Assumes the number of entries in each decompression buffers is a power of two.
*
* @param index index of the value in the column
+ *
* @return the value at the given index
*/
@Override
@@ -341,6 +351,7 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier dictionary = null;
@@ -145,6 +152,15 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
return this;
}
+ public Builder withMultiValuedColumn(CompressedVSizeIndexedSupplier multiValuedColumn)
+ {
+ Preconditions.checkState(singleValuedColumn == null, "Cannot set both singleValuedColumn and multiValuedColumn");
+ this.version = VERSION.COMPRESSED;
+ this.flags |= Feature.MULTI_VALUE.getMask();
+ this.multiValuedColumn = multiValuedColumn;
+ return this;
+ }
+
public DictionaryEncodedColumnPartSerde build()
{
Preconditions.checkArgument(
@@ -165,6 +181,8 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
byteOrder
);
}
+
+
}
public static Builder builder()
@@ -265,7 +283,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
public void write(WritableByteChannel channel) throws IOException
{
channel.write(ByteBuffer.wrap(new byte[]{version.asByte()}));
- if(version.compareTo(VERSION.COMPRESSED) >= 0) {
+ if (version.compareTo(VERSION.COMPRESSED) >= 0) {
channel.write(ByteBuffer.wrap(Ints.toByteArray(flags)));
}
@@ -304,18 +322,15 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
final VERSION rVersion = VERSION.fromByte(buffer.get());
final int rFlags;
- if(rVersion.compareTo(VERSION.COMPRESSED) >= 0 ) {
+ if (rVersion.compareTo(VERSION.COMPRESSED) >= 0) {
rFlags = buffer.getInt();
} else {
rFlags = rVersion.equals(VERSION.UNCOMPRESSED_MULTI_VALUE) ?
- Feature.MULTI_VALUE.getMask() :
- NO_FLAGS;
+ Feature.MULTI_VALUE.getMask() :
+ NO_FLAGS;
}
final boolean hasMultipleValues = Feature.MULTI_VALUE.isSet(rFlags);
- if(rVersion.equals(VERSION.COMPRESSED) && hasMultipleValues) {
- throw new IAE("Compressed dictionary encoded columns currently do not support multi-value columns");
- }
final GenericIndexed rDictionary = GenericIndexed.read(buffer, GenericIndexed.STRING_STRATEGY);
builder.setType(ValueType.STRING);
@@ -323,13 +338,12 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
final WritableSupplier rSingleValuedColumn;
final WritableSupplier> rMultiValuedColumn;
- if (rVersion.compareTo(VERSION.COMPRESSED) >= 0) {
- rSingleValuedColumn = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(buffer, byteOrder);
- rMultiValuedColumn = null;
+ if (hasMultipleValues) {
+ rMultiValuedColumn = readMultiValuedColum(rVersion, buffer);
+ rSingleValuedColumn = null;
} else {
- Pair, VSizeIndexed> cols = readUncompressed(rVersion, buffer);
- rSingleValuedColumn = cols.lhs;
- rMultiValuedColumn = cols.rhs == null ? null : cols.rhs.asWritableSupplier();
+ rSingleValuedColumn = readSingleValuedColumn(rVersion, buffer);
+ rMultiValuedColumn = null;
}
builder.setHasMultipleValues(hasMultipleValues)
@@ -374,30 +388,26 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
);
}
- private static Pair, VSizeIndexed> readUncompressed(
- VERSION version,
- ByteBuffer buffer
- )
+ private WritableSupplier readSingleValuedColumn(VERSION version, ByteBuffer buffer)
{
- final WritableSupplier singleValuedColumn;
- final VSizeIndexed multiValuedColumn;
-
switch (version) {
case UNCOMPRESSED_SINGLE_VALUE:
- singleValuedColumn = VSizeIndexedInts.readFromByteBuffer(buffer).asWritableSupplier();
- multiValuedColumn = null;
- break;
-
- case UNCOMPRESSED_MULTI_VALUE:
- singleValuedColumn = null;
- multiValuedColumn = VSizeIndexed.readFromByteBuffer(buffer);
- break;
-
- default:
- throw new IAE("Unsupported version[%s]", version);
+ return VSizeIndexedInts.readFromByteBuffer(buffer).asWritableSupplier();
+ case COMPRESSED:
+ return CompressedVSizeIntsIndexedSupplier.fromByteBuffer(buffer, byteOrder);
}
+ throw new IAE("Unsupported single-value version[%s]", version);
+ }
- return Pair.of(singleValuedColumn, multiValuedColumn);
+ private WritableSupplier> readMultiValuedColum(VERSION version, ByteBuffer buffer)
+ {
+ switch (version) {
+ case UNCOMPRESSED_MULTI_VALUE:
+ return VSizeIndexed.readFromByteBuffer(buffer).asWritableSupplier();
+ case COMPRESSED:
+ return CompressedVSizeIndexedSupplier.fromByteBuffer(buffer, byteOrder);
+ }
+ throw new IAE("Unsupported multi-value version[%s]", version);
}
@Override
diff --git a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java
new file mode 100644
index 00000000000..49d0ce7b787
--- /dev/null
+++ b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.data;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import io.druid.segment.CompressedVSizeIndexedSupplier;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ */
+public class CompressedVSizeIndexedSupplierTest
+{
+ private List vals;
+
+ private CompressedVSizeIndexedSupplier indexedSupplier;
+
+ @Before
+ public void setUpSimple(){
+ vals = Arrays.asList(
+ new int[1],
+ new int[]{1, 2, 3, 4, 5},
+ new int[]{6, 7, 8, 9, 10},
+ new int[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
+ );
+
+ indexedSupplier = CompressedVSizeIndexedSupplier.fromIterable(
+ Iterables.transform(
+ vals,
+ new Function()
+ {
+ @Override
+ public IndexedInts apply(int[] input)
+ {
+ return VSizeIndexedInts.fromArray(input, 20);
+ }
+ }
+ ), 20, ByteOrder.nativeOrder(),
+ CompressedObjectStrategy.CompressionStrategy.LZ4
+ );
+ }
+
+ @After
+ public void teardown(){
+ indexedSupplier = null;
+ vals = null;
+ }
+
+ @Test
+ public void testSanity() throws Exception
+ {
+ assertSame(vals, indexedSupplier.get());
+ }
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ indexedSupplier.writeToChannel(Channels.newChannel(baos));
+
+ final byte[] bytes = baos.toByteArray();
+ Assert.assertEquals(indexedSupplier.getSerializedSize(), bytes.length);
+ CompressedVSizeIndexedSupplier deserializedIndexed = CompressedVSizeIndexedSupplier.fromByteBuffer(
+ ByteBuffer.wrap(bytes),
+ ByteOrder.nativeOrder()
+ );
+
+ assertSame(vals, deserializedIndexed.get());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetInvalidElementInRow(){
+ indexedSupplier.get().get(3).get(15);
+ }
+
+ @Test
+ public void testIterators(){
+ Iterator iterator = indexedSupplier.get().iterator();
+ int row = 0;
+ while(iterator.hasNext()){
+ final int[] ints = vals.get(row);
+ final IndexedInts vSizeIndexedInts = iterator.next();
+
+ Assert.assertEquals(ints.length, vSizeIndexedInts.size());
+ Iterator valsIterator = vSizeIndexedInts.iterator();
+ int j=0;
+ while(valsIterator.hasNext()){
+ Assert.assertEquals((Integer)ints[j], valsIterator.next());
+ j++;
+ }
+ row ++;
+ }
+ }
+
+ private void assertSame(List someInts, IndexedMultivalue indexed)
+ {
+ Assert.assertEquals(someInts.size(), indexed.size());
+ for (int i = 0; i < indexed.size(); ++i) {
+ final int[] ints = someInts.get(i);
+ final IndexedInts vSizeIndexedInts = indexed.get(i);
+
+ Assert.assertEquals(ints.length, vSizeIndexedInts.size());
+ for (int j = 0; j < ints.length; j++) {
+ Assert.assertEquals(ints[j], vSizeIndexedInts.get(j));
+ }
+ }
+ }
+}