Composite approach for checking in-filter values set in column dictionary (#13133)

This commit is contained in:
Rohan Garg 2022-10-13 12:32:48 +05:30 committed by GitHub
parent 346fbf133f
commit 45dfd679e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 476 additions and 9 deletions

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import com.google.common.collect.FluentIterable;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.BitmapColumnIndex;
import org.apache.druid.segment.column.StringValueSetIndex;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.serde.DictionaryEncodedStringIndexSupplier;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 10)
public class DictionaryEncodedStringIndexSupplierBenchmark
{
static {
NullHandling.initializeForTests();
}
@State(Scope.Benchmark)
public static class BenchmarkState
{
@Nullable
private DictionaryEncodedStringIndexSupplier.GenericIndexedDictionaryEncodedStringValueSetIndex stringValueSetIndex;
private final TreeSet<ByteBuffer> values = new TreeSet<>();
private static final int START_INT = 10_000_000;
// cardinality of the dictionary. it will contain this many ints (as strings, of course), starting at START_INT,
// even numbers only.
@Param({"1000000"})
int dictionarySize;
@Param({"1", "2", "5", "10", "15", "20", "30", "50", "100"})
int filterToDictionaryPercentage;
@Param({"10", "100"})
int selectivityPercentage;
@Setup(Level.Trial)
public void setup()
{
final BitmapFactory bitmapFactory = new RoaringBitmapFactory();
final BitmapSerdeFactory serdeFactory = new RoaringBitmapSerdeFactory(null);
final Iterable<Integer> ints = intGenerator();
final GenericIndexed<String> dictionary = GenericIndexed.fromIterable(
FluentIterable.from(ints)
.transform(Object::toString),
GenericIndexed.STRING_STRATEGY
);
final GenericIndexed<ByteBuffer> dictionaryUtf8 = GenericIndexed.fromIterable(
FluentIterable.from(ints)
.transform(i -> ByteBuffer.wrap(StringUtils.toUtf8(String.valueOf(i)))),
GenericIndexed.BYTE_BUFFER_STRATEGY
);
final GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.fromIterable(
() -> IntStream.range(0, dictionarySize)
.mapToObj(
i -> {
final MutableBitmap mutableBitmap = bitmapFactory.makeEmptyMutableBitmap();
mutableBitmap.add(i);
return bitmapFactory.makeImmutableBitmap(mutableBitmap);
}
)
.iterator(),
serdeFactory.getObjectStrategy()
);
DictionaryEncodedStringIndexSupplier dictionaryEncodedStringIndexSupplier =
new DictionaryEncodedStringIndexSupplier(bitmapFactory, dictionary, dictionaryUtf8, bitmaps, null);
stringValueSetIndex =
(DictionaryEncodedStringIndexSupplier.GenericIndexedDictionaryEncodedStringValueSetIndex)
dictionaryEncodedStringIndexSupplier.as(StringValueSetIndex.class);
List<Integer> filterValues = new ArrayList<>();
List<Integer> nonFilterValues = new ArrayList<>();
for (int i = 0; i < dictionarySize; i++) {
filterValues.add(START_INT + i * 2);
nonFilterValues.add(START_INT + i * 2 + 1);
}
Random r = new Random(9001);
Collections.shuffle(filterValues);
Collections.shuffle(nonFilterValues);
values.clear();
for (int i = 0; i < filterToDictionaryPercentage * dictionarySize / 100; i++) {
if (r.nextInt(100) < selectivityPercentage) {
values.add(ByteBuffer.wrap((filterValues.get(i).toString()).getBytes(StandardCharsets.UTF_8)));
} else {
values.add(ByteBuffer.wrap((nonFilterValues.get(i).toString()).getBytes(StandardCharsets.UTF_8)));
}
}
}
private Iterable<Integer> intGenerator()
{
// i * 2 => half of these values will be present in the inFilter, half won't.
return () -> IntStream.range(0, dictionarySize).map(i -> START_INT + i * 2).boxed().iterator();
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void doValueSetCheck(Blackhole blackhole, BenchmarkState state)
{
BitmapColumnIndex bitmapIndex = state.stringValueSetIndex.forSortedValuesUtf8(state.values);
bitmapIndex.estimateSelectivity(10_000_000);
}
}

View File

@ -64,6 +64,7 @@ public class InFilterBenchmark
private static final int START_INT = 10_000_000; private static final int START_INT = 10_000_000;
private InDimFilter inFilter; private InDimFilter inFilter;
private InDimFilter endInDimFilter;
// cardinality of the dictionary. it will contain this many ints (as strings, of course), starting at START_INT, // cardinality of the dictionary. it will contain this many ints (as strings, of course), starting at START_INT,
// even numbers only. // even numbers only.
@ -71,7 +72,7 @@ public class InFilterBenchmark
int dictionarySize; int dictionarySize;
// cardinality of the "in" filter. half of its values will be in the dictionary, half will not. // cardinality of the "in" filter. half of its values will be in the dictionary, half will not.
@Param({"10000"}) @Param({"1", "10", "100", "1000", "10000"})
int filterSize; int filterSize;
// selector will contain a "dictionarySize" number of bitmaps; each one contains a single int. // selector will contain a "dictionarySize" number of bitmaps; each one contains a single int.
@ -114,6 +115,12 @@ public class InFilterBenchmark
"dummy", "dummy",
IntStream.range(START_INT, START_INT + filterSize).mapToObj(String::valueOf).collect(Collectors.toSet()) IntStream.range(START_INT, START_INT + filterSize).mapToObj(String::valueOf).collect(Collectors.toSet())
); );
endInDimFilter = new InDimFilter(
"dummy",
IntStream.range(START_INT + dictionarySize * 2, START_INT + dictionarySize * 2 + 1)
.mapToObj(String::valueOf)
.collect(Collectors.toSet())
);
} }
@Benchmark @Benchmark
@ -125,6 +132,15 @@ public class InFilterBenchmark
blackhole.consume(bitmapIndex); blackhole.consume(bitmapIndex);
} }
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void doFilterAtEnd(Blackhole blackhole)
{
final ImmutableBitmap bitmapIndex = Filters.computeDefaultBitmapResults(endInDimFilter, selector);
blackhole.consume(bitmapIndex);
}
private Iterable<Integer> intGenerator() private Iterable<Integer> intGenerator()
{ {
// i * 2 => half of these values will be present in the inFilter, half won't. // i * 2 => half of these values will be present in the inFilter, half won't.

View File

@ -21,6 +21,8 @@ package org.apache.druid.segment.serde;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import it.unimi.dsi.fastutil.ints.IntIntImmutablePair; import it.unimi.dsi.fastutil.ints.IntIntImmutablePair;
import it.unimi.dsi.fastutil.ints.IntIntPair; import it.unimi.dsi.fastutil.ints.IntIntPair;
import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.ints.IntIterator;
@ -175,6 +177,13 @@ public class DictionaryEncodedStringIndexSupplier implements ColumnIndexSupplier
extends BaseGenericIndexedDictionaryEncodedIndex<ByteBuffer> implements StringValueSetIndex, Utf8ValueSetIndex extends BaseGenericIndexedDictionaryEncodedIndex<ByteBuffer> implements StringValueSetIndex, Utf8ValueSetIndex
{ {
private static final int SIZE_WORTH_CHECKING_MIN = 8; private static final int SIZE_WORTH_CHECKING_MIN = 8;
// This determines the cut-off point to swtich the merging algorithm from doing binary-search per element in the value
// set to doing a sorted merge algorithm between value set and dictionary. The ratio here represents the ratio b/w
// the number of elements in value set and the number of elements in the dictionary. The number has been derived
// using benchmark in https://github.com/apache/druid/pull/13133. If the ratio is higher than the threshold, we use
// sorted merge instead of binary-search based algorithm.
private static final double SORTED_MERGE_RATIO_THRESHOLD = 0.12D;
private final GenericIndexed<ByteBuffer> genericIndexedDictionary;
public GenericIndexedDictionaryEncodedStringValueSetIndex( public GenericIndexedDictionaryEncodedStringValueSetIndex(
BitmapFactory bitmapFactory, BitmapFactory bitmapFactory,
@ -183,6 +192,7 @@ public class DictionaryEncodedStringIndexSupplier implements ColumnIndexSupplier
) )
{ {
super(bitmapFactory, dictionary, bitmaps); super(bitmapFactory, dictionary, bitmaps);
this.genericIndexedDictionary = dictionary;
} }
@Override @Override
@ -219,7 +229,8 @@ public class DictionaryEncodedStringIndexSupplier implements ColumnIndexSupplier
Iterables.transform( Iterables.transform(
values, values,
input -> ByteBuffer.wrap(StringUtils.toUtf8(input)) input -> ByteBuffer.wrap(StringUtils.toUtf8(input))
) ),
values.size()
); );
} }
@ -235,24 +246,86 @@ public class DictionaryEncodedStringIndexSupplier implements ColumnIndexSupplier
tailSet = valuesUtf8; tailSet = valuesUtf8;
} }
return getBitmapColumnIndexForSortedIterableUtf8(tailSet); return getBitmapColumnIndexForSortedIterableUtf8(tailSet, tailSet.size());
} }
/** /**
* Helper used by {@link #forSortedValues} and {@link #forSortedValuesUtf8}. * Helper used by {@link #forSortedValues} and {@link #forSortedValuesUtf8}.
*/ */
private BitmapColumnIndex getBitmapColumnIndexForSortedIterableUtf8(Iterable<ByteBuffer> valuesUtf8) private BitmapColumnIndex getBitmapColumnIndexForSortedIterableUtf8(Iterable<ByteBuffer> valuesUtf8, int size)
{ {
// for large number of in-filter values in comparison to the dictionary size, use the sorted merge algorithm.
if (size > SORTED_MERGE_RATIO_THRESHOLD * dictionary.size()) {
return new SimpleImmutableBitmapIterableIndex()
{
@Override
public Iterable<ImmutableBitmap> getBitmapIterable()
{
return () -> new Iterator<ImmutableBitmap>()
{
final PeekingIterator<ByteBuffer> valuesIterator = Iterators.peekingIterator(valuesUtf8.iterator());
final PeekingIterator<ByteBuffer> dictionaryIterator =
Iterators.peekingIterator(genericIndexedDictionary.iterator());
int next = -1;
int idx = 0;
@Override
public boolean hasNext()
{
if (next < 0) {
findNext();
}
return next >= 0;
}
@Override
public ImmutableBitmap next()
{
if (next < 0) {
findNext();
if (next < 0) {
throw new NoSuchElementException();
}
}
final int swap = next;
next = -1;
return getBitmap(swap);
}
private void findNext()
{
while (next < 0 && valuesIterator.hasNext() && dictionaryIterator.hasNext()) {
ByteBuffer nextValue = valuesIterator.peek();
ByteBuffer nextDictionaryKey = dictionaryIterator.peek();
int comparison = GenericIndexed.BYTE_BUFFER_STRATEGY.compare(nextValue, nextDictionaryKey);
if (comparison == 0) {
next = idx;
valuesIterator.next();
break;
} else if (comparison < 0) {
valuesIterator.next();
} else {
dictionaryIterator.next();
idx++;
}
}
}
};
}
};
}
// if the size of in-filter values is less than the threshold percentage of dictionary size, then use binary search
// based lookup per value. The algorithm works well for smaller number of values.
return new SimpleImmutableBitmapIterableIndex() return new SimpleImmutableBitmapIterableIndex()
{ {
@Override @Override
public Iterable<ImmutableBitmap> getBitmapIterable() public Iterable<ImmutableBitmap> getBitmapIterable()
{ {
final int dictionarySize = dictionary.size();
return () -> new Iterator<ImmutableBitmap>() return () -> new Iterator<ImmutableBitmap>()
{ {
final Iterator<ByteBuffer> iterator = valuesUtf8.iterator(); final PeekingIterator<ByteBuffer> valuesIterator = Iterators.peekingIterator(valuesUtf8.iterator());
final int dictionarySize = dictionary.size();
int next = -1; int next = -1;
@Override @Override
@ -280,8 +353,8 @@ public class DictionaryEncodedStringIndexSupplier implements ColumnIndexSupplier
private void findNext() private void findNext()
{ {
while (next < 0 && iterator.hasNext()) { while (next < 0 && valuesIterator.hasNext()) {
ByteBuffer nextValue = iterator.next(); ByteBuffer nextValue = valuesIterator.next();
next = dictionary.indexOf(nextValue); next = dictionary.indexOf(nextValue);
if (next == -dictionarySize - 1) { if (next == -dictionarySize - 1) {

View File

@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.serde;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.DefaultBitmapResultFactory;
import org.apache.druid.segment.column.BitmapColumnIndex;
import org.apache.druid.segment.column.StringValueSetIndex;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
import org.roaringbitmap.IntIterator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.TreeSet;
public class DictionaryEncodedStringIndexSupplierTest extends InitializedNullHandlingTest
{
BitmapSerdeFactory roaringFactory = new RoaringBitmapSerdeFactory(null);
BitmapResultFactory<ImmutableBitmap> bitmapResultFactory = new DefaultBitmapResultFactory(
roaringFactory.getBitmapFactory()
);
@Test
public void testStringColumnWithNullValueSetIndex() throws IOException
{
DictionaryEncodedStringIndexSupplier indexSupplier = makeStringWithNullsSupplier();
StringValueSetIndex valueSetIndex = indexSupplier.as(StringValueSetIndex.class);
Assert.assertNotNull(valueSetIndex);
// 10 rows
// dictionary: [null, b, foo, fooo, z]
// column: [foo, null, fooo, b, z, fooo, z, null, null, foo]
BitmapColumnIndex columnIndex = valueSetIndex.forValue("b");
Assert.assertNotNull(columnIndex);
Assert.assertEquals(0.1, columnIndex.estimateSelectivity(10), 0.0);
ImmutableBitmap bitmap = columnIndex.computeBitmapResult(bitmapResultFactory);
checkBitmap(bitmap, 3);
// non-existent in local column
columnIndex = valueSetIndex.forValue("fo");
Assert.assertNotNull(columnIndex);
Assert.assertEquals(0.0, columnIndex.estimateSelectivity(10), 0.0);
bitmap = columnIndex.computeBitmapResult(bitmapResultFactory);
checkBitmap(bitmap);
// set index
columnIndex = valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of("b", "fooo", "z")));
Assert.assertNotNull(columnIndex);
Assert.assertEquals(0.5, columnIndex.estimateSelectivity(10), 0.0);
bitmap = columnIndex.computeBitmapResult(bitmapResultFactory);
checkBitmap(bitmap, 2, 3, 4, 5, 6);
// set index with single value in middle
columnIndex = valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of("foo")));
Assert.assertNotNull(columnIndex);
Assert.assertEquals(0.2, columnIndex.estimateSelectivity(10), 0.0);
bitmap = columnIndex.computeBitmapResult(bitmapResultFactory);
checkBitmap(bitmap, 0, 9);
// set index with no element in column and all elements less than lowest non-null value
columnIndex = valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of("a", "aa", "aaa")));
Assert.assertNotNull(columnIndex);
Assert.assertEquals(0.0, columnIndex.estimateSelectivity(10), 0.0);
bitmap = columnIndex.computeBitmapResult(bitmapResultFactory);
checkBitmap(bitmap);
// set index with no element in column and all elements greater than highest non-null value
columnIndex = valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of("zz", "zzz", "zzzz")));
Assert.assertNotNull(columnIndex);
Assert.assertEquals(0.0, columnIndex.estimateSelectivity(10), 0.0);
bitmap = columnIndex.computeBitmapResult(bitmapResultFactory);
checkBitmap(bitmap);
}
private DictionaryEncodedStringIndexSupplier makeStringWithNullsSupplier() throws IOException
{
ByteBuffer stringBuffer = ByteBuffer.allocate(1 << 12);
ByteBuffer byteBuffer = ByteBuffer.allocate(1 << 12);
GenericIndexedWriter<String> stringWriter = new GenericIndexedWriter<>(
new OnHeapMemorySegmentWriteOutMedium(),
"strings",
GenericIndexed.STRING_STRATEGY
);
GenericIndexedWriter<ByteBuffer> byteBufferWriter = new GenericIndexedWriter<>(
new OnHeapMemorySegmentWriteOutMedium(),
"byteBuffers",
GenericIndexed.BYTE_BUFFER_STRATEGY
);
stringWriter.open();
byteBufferWriter.open();
ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12);
GenericIndexedWriter<ImmutableBitmap> bitmapWriter = new GenericIndexedWriter<>(
new OnHeapMemorySegmentWriteOutMedium(),
"bitmaps",
roaringFactory.getObjectStrategy()
);
bitmapWriter.setObjectsNotSorted();
bitmapWriter.open();
// 10 rows
// dictionary: [null, b, fo, foo, fooo, z]
// column: [foo, null, fooo, b, z, fooo, z, null, null, foo]
// null
stringWriter.write(null);
byteBufferWriter.write(null);
bitmapWriter.write(fillBitmap(1, 7, 8));
// b
stringWriter.write("b");
byteBufferWriter.write(ByteBuffer.wrap("b".getBytes(StandardCharsets.UTF_8)));
bitmapWriter.write(fillBitmap(3));
// fo
stringWriter.write("foo");
byteBufferWriter.write(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
bitmapWriter.write(fillBitmap(0, 9));
// fooo
stringWriter.write("fooo");
byteBufferWriter.write(ByteBuffer.wrap("fooo".getBytes(StandardCharsets.UTF_8)));
bitmapWriter.write(fillBitmap(2, 5));
// z
stringWriter.write("z");
byteBufferWriter.write(ByteBuffer.wrap("z".getBytes(StandardCharsets.UTF_8)));
bitmapWriter.write(fillBitmap(4, 6));
writeToBuffer(stringBuffer, stringWriter);
writeToBuffer(byteBuffer, stringWriter);
writeToBuffer(bitmapsBuffer, bitmapWriter);
GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy());
return new DictionaryEncodedStringIndexSupplier(
roaringFactory.getBitmapFactory(),
GenericIndexed.read(stringBuffer, GenericIndexed.STRING_STRATEGY),
GenericIndexed.read(byteBuffer, GenericIndexed.BYTE_BUFFER_STRATEGY),
bitmaps,
null
);
}
static void writeToBuffer(ByteBuffer buffer, Serializer serializer) throws IOException
{
WritableByteChannel channel = new WritableByteChannel()
{
@Override
public int write(ByteBuffer src)
{
int size = src.remaining();
buffer.put(src);
return size;
}
@Override
public boolean isOpen()
{
return true;
}
@Override
public void close()
{
}
};
serializer.writeTo(channel, null);
buffer.position(0);
}
private ImmutableBitmap fillBitmap(int... rows)
{
MutableBitmap bitmap = roaringFactory.getBitmapFactory().makeEmptyMutableBitmap();
for (int i : rows) {
bitmap.add(i);
}
return roaringFactory.getBitmapFactory().makeImmutableBitmap(bitmap);
}
private void checkBitmap(ImmutableBitmap bitmap, int... expectedRows)
{
IntIterator iterator = bitmap.iterator();
for (int i : expectedRows) {
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(i, iterator.next());
}
Assert.assertFalse(iterator.hasNext());
}
}