mirror of https://github.com/apache/druid.git
Speed up joins on indexed tables with string keys (#9278)
* Speed up joins on indexed tables with string keys When joining on index tables with string keys, caching the computation of row id to row numbers improves performance on the JoinAndLookupBenchmark.joinIndexTableStringKey* benchmarks by about 10% if the column cache is enabled an by about 100% if the column cache is disabled. * Faster cache impl and handle unknown cardinality * Remove unused dependency * Hoist cardinality check outside of hot loop * Fix dummy DimensionSelector for tests
This commit is contained in:
parent
868fdeb384
commit
0d2b16c1d0
|
@ -35,7 +35,7 @@ public class ConstantDimensionSelector implements SingleValueHistoricalDimension
|
|||
{
|
||||
private final String value;
|
||||
|
||||
ConstantDimensionSelector(final String value)
|
||||
public ConstantDimensionSelector(final String value)
|
||||
{
|
||||
if (NullHandling.isNullOrEquivalent(value)) {
|
||||
// There's an optimized implementation for nulls that callers should use instead.
|
||||
|
|
|
@ -19,9 +19,12 @@
|
|||
|
||||
package org.apache.druid.segment.join.table;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterator;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterators;
|
||||
import it.unimi.dsi.fastutil.ints.IntList;
|
||||
import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
|
||||
import it.unimi.dsi.fastutil.ints.IntSet;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
|
@ -33,6 +36,7 @@ import org.apache.druid.segment.BaseObjectColumnValueSelector;
|
|||
import org.apache.druid.segment.ColumnProcessorFactory;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.DimensionDictionarySelector;
|
||||
import org.apache.druid.segment.DimensionSelector;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
|
@ -43,8 +47,12 @@ import org.apache.druid.segment.join.JoinMatcher;
|
|||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -239,15 +247,47 @@ public class IndexedTableJoinMatcher implements JoinMatcher
|
|||
/**
|
||||
* Makes suppliers that returns the list of IndexedTable rows that match the values from selectors.
|
||||
*/
|
||||
private static class ConditionMatcherFactory implements ColumnProcessorFactory<Supplier<IntIterator>>
|
||||
@VisibleForTesting
|
||||
static class ConditionMatcherFactory implements ColumnProcessorFactory<Supplier<IntIterator>>
|
||||
{
|
||||
@VisibleForTesting
|
||||
static final int CACHE_MAX_SIZE = 1000;
|
||||
|
||||
private static final int MAX_NUM_CACHE = 10;
|
||||
|
||||
private final ValueType keyType;
|
||||
private final IndexedTable.Index index;
|
||||
|
||||
// DimensionSelector -> (int) dimension id -> (IntList) row numbers
|
||||
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection") // updated via computeIfAbsent
|
||||
private final LruLoadingHashMap<DimensionSelector, Int2IntListMap> dimensionCaches;
|
||||
|
||||
ConditionMatcherFactory(ValueType keyType, IndexedTable.Index index)
|
||||
{
|
||||
this.keyType = keyType;
|
||||
this.index = index;
|
||||
|
||||
this.dimensionCaches = new LruLoadingHashMap<>(
|
||||
MAX_NUM_CACHE,
|
||||
selector -> {
|
||||
int cardinality = selector.getValueCardinality();
|
||||
IntFunction<IntList> loader = dimensionId -> getRowNumbers(selector, dimensionId);
|
||||
return cardinality <= CACHE_MAX_SIZE
|
||||
? new Int2IntListLookupTable(cardinality, loader)
|
||||
: new Int2IntListLruCache(CACHE_MAX_SIZE, loader);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private IntList getRowNumbers(DimensionSelector selector, int dimensionId)
|
||||
{
|
||||
final String key = selector.lookupName(dimensionId);
|
||||
return index.find(key);
|
||||
}
|
||||
|
||||
private IntList getAndCacheRowNumbers(DimensionSelector selector, int dimensionId)
|
||||
{
|
||||
return dimensionCaches.getAndLoadIfAbsent(selector).getAndLoadIfAbsent(dimensionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -259,17 +299,44 @@ public class IndexedTableJoinMatcher implements JoinMatcher
|
|||
@Override
|
||||
public Supplier<IntIterator> makeDimensionProcessor(DimensionSelector selector)
|
||||
{
|
||||
// NOTE: The slow (cardinality unknown) and fast (cardinality known) code paths below only differ in the calls to
|
||||
// getRowNumbers() and getAndCacheRowNumbers(), respectively. The majority of the code path is duplicated to avoid
|
||||
// adding indirection to fetch getRowNumbers()/getAndCacheRowNumbers() from a local BiFunction variable that is
|
||||
// set outside of the supplier. Minimizing overhead is desirable since the supplier is called from a hot loop for
|
||||
// joins.
|
||||
|
||||
if (selector.getValueCardinality() == DimensionDictionarySelector.CARDINALITY_UNKNOWN) {
|
||||
// If the cardinality is unknown, then the selector does not have a "real" dictionary and the dimension id
|
||||
// is not valid outside the context of a specific row. This means we cannot use a cache and must fall
|
||||
// back to this slow code path.
|
||||
return () -> {
|
||||
final IndexedInts row = selector.getRow();
|
||||
|
||||
if (row.size() == 1) {
|
||||
final String key = selector.lookupName(row.get(0));
|
||||
return index.find(key).iterator();
|
||||
int dimensionId = row.get(0);
|
||||
IntList rowNumbers = getRowNumbers(selector, dimensionId);
|
||||
return rowNumbers.iterator();
|
||||
} else {
|
||||
// Multi-valued rows are not handled by the join system right now; treat them as nulls.
|
||||
return IntIterators.EMPTY_ITERATOR;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// If the cardinality is known, then the dimension id is still valid outside the context of a specific row and
|
||||
// its mapping to row numbers can be cached.
|
||||
return () -> {
|
||||
final IndexedInts row = selector.getRow();
|
||||
|
||||
if (row.size() == 1) {
|
||||
int dimensionId = row.get(0);
|
||||
IntList rowNumbers = getAndCacheRowNumbers(selector, dimensionId);
|
||||
return rowNumbers.iterator();
|
||||
} else {
|
||||
// Multi-valued rows are not handled by the join system right now; treat them as nulls.
|
||||
return IntIterators.EMPTY_ITERATOR;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -308,4 +375,111 @@ public class IndexedTableJoinMatcher implements JoinMatcher
|
|||
return () -> IntIterators.EMPTY_ITERATOR;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static class LruLoadingHashMap<K, V> extends LinkedHashMap<K, V>
|
||||
{
|
||||
private final int maxSize;
|
||||
private final Function<K, V> loader;
|
||||
|
||||
LruLoadingHashMap(int maxSize, Function<K, V> loader)
|
||||
{
|
||||
super(capacity(maxSize));
|
||||
this.maxSize = maxSize;
|
||||
this.loader = loader;
|
||||
}
|
||||
|
||||
V getAndLoadIfAbsent(K key)
|
||||
{
|
||||
return computeIfAbsent(key, loader);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean removeEldestEntry(Map.Entry eldest)
|
||||
{
|
||||
return size() > maxSize;
|
||||
}
|
||||
|
||||
private static int capacity(int expectedSize)
|
||||
{
|
||||
// This is the calculation used in JDK8 to resize when a putAll happens; it seems to be the most conservative
|
||||
// calculation we can make. 0.75 is the default load factor.
|
||||
return (int) ((float) expectedSize / 0.75F + 1.0F);
|
||||
}
|
||||
}
|
||||
|
||||
private interface Int2IntListMap
|
||||
{
|
||||
IntList getAndLoadIfAbsent(int key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Lookup table for keys in the range from 0 to maxSize - 1
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class Int2IntListLookupTable implements Int2IntListMap
|
||||
{
|
||||
private final IntList[] lookup;
|
||||
private final IntFunction<IntList> loader;
|
||||
|
||||
Int2IntListLookupTable(int maxSize, IntFunction<IntList> loader)
|
||||
{
|
||||
this.loader = loader;
|
||||
this.lookup = new IntList[maxSize];
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntList getAndLoadIfAbsent(int key)
|
||||
{
|
||||
IntList value = lookup[key];
|
||||
|
||||
if (value == null) {
|
||||
value = loader.apply(key);
|
||||
lookup[key] = value;
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* LRU cache optimized for primitive int keys
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class Int2IntListLruCache implements Int2IntListMap
|
||||
{
|
||||
private final Int2ObjectLinkedOpenHashMap<IntList> cache;
|
||||
private final int maxSize;
|
||||
private final IntFunction<IntList> loader;
|
||||
|
||||
Int2IntListLruCache(int maxSize, IntFunction<IntList> loader)
|
||||
{
|
||||
this.cache = new Int2ObjectLinkedOpenHashMap<>(maxSize);
|
||||
this.maxSize = maxSize;
|
||||
this.loader = loader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntList getAndLoadIfAbsent(int key)
|
||||
{
|
||||
IntList value = cache.getAndMoveToFirst(key);
|
||||
|
||||
if (value == null) {
|
||||
value = loader.apply(key);
|
||||
cache.putAndMoveToFirst(key, value);
|
||||
}
|
||||
|
||||
if (cache.size() > maxSize) {
|
||||
cache.removeLast();
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
IntList get(int key)
|
||||
{
|
||||
return cache.get(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,260 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.segment.join.table;
|
||||
|
||||
import it.unimi.dsi.fastutil.ints.IntArrayList;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterator;
|
||||
import it.unimi.dsi.fastutil.ints.IntList;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.segment.ConstantDimensionSelector;
|
||||
import org.apache.druid.segment.DimensionDictionarySelector;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.runners.Enclosed;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@RunWith(Enclosed.class)
|
||||
public class IndexedTableJoinMatcherTest
|
||||
{
|
||||
private static final int SIZE = 3;
|
||||
|
||||
@RunWith(Enclosed.class)
|
||||
public static class ConditionMatcherFactoryTest
|
||||
{
|
||||
public static class MakeDimensionProcessorTest
|
||||
{
|
||||
private static final String KEY = "key";
|
||||
|
||||
static {
|
||||
NullHandling.initializeForTests();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getsCorrectResultWhenSelectorCardinalityUnknown()
|
||||
{
|
||||
Supplier<IntIterator> target = makeDimensionProcessor(DimensionDictionarySelector.CARDINALITY_UNKNOWN);
|
||||
Assert.assertEquals(KEY.length(), target.get().nextInt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getsCorrectResultWhenSelectorCardinalityLow()
|
||||
{
|
||||
int lowCardinality = IndexedTableJoinMatcher.ConditionMatcherFactory.CACHE_MAX_SIZE / 10;
|
||||
Supplier<IntIterator> target = makeDimensionProcessor(lowCardinality);
|
||||
Assert.assertEquals(KEY.length(), target.get().nextInt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getsCorrectResultWhenSelectorCardinalityHigh()
|
||||
{
|
||||
int highCardinality = IndexedTableJoinMatcher.ConditionMatcherFactory.CACHE_MAX_SIZE / 10;
|
||||
Supplier<IntIterator> target = makeDimensionProcessor(highCardinality);
|
||||
Assert.assertEquals(KEY.length(), target.get().nextInt());
|
||||
}
|
||||
|
||||
private static Supplier<IntIterator> makeDimensionProcessor(int valueCardinality)
|
||||
{
|
||||
IndexedTableJoinMatcher.ConditionMatcherFactory conditionMatcherFactory =
|
||||
new IndexedTableJoinMatcher.ConditionMatcherFactory(
|
||||
ValueType.STRING,
|
||||
IndexedTableJoinMatcherTest::createSingletonIntList
|
||||
);
|
||||
return conditionMatcherFactory.makeDimensionProcessor(new TestDimensionSelector(KEY, valueCardinality));
|
||||
}
|
||||
|
||||
private static class TestDimensionSelector extends ConstantDimensionSelector
|
||||
{
|
||||
private final int valueCardinality;
|
||||
|
||||
TestDimensionSelector(String value, int valueCardinality)
|
||||
{
|
||||
super(value);
|
||||
this.valueCardinality = valueCardinality;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return valueCardinality;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class LruLoadingHashMapTest
|
||||
{
|
||||
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection") // updated via computeIfAbsent
|
||||
private IndexedTableJoinMatcher.LruLoadingHashMap<Long, Long> target;
|
||||
|
||||
private AtomicLong counter;
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
counter = new AtomicLong(0);
|
||||
Function<Long, Long> loader = key -> {
|
||||
counter.incrementAndGet();
|
||||
return key;
|
||||
};
|
||||
|
||||
target = new IndexedTableJoinMatcher.LruLoadingHashMap<>(SIZE, loader);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadsValueIfAbsent()
|
||||
{
|
||||
Long key = 1L;
|
||||
Assert.assertEquals(key, target.getAndLoadIfAbsent(key));
|
||||
Assert.assertEquals(1L, counter.longValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void doesNotLoadIfPresent()
|
||||
{
|
||||
Long key = 1L;
|
||||
Assert.assertEquals(key, target.getAndLoadIfAbsent(key));
|
||||
Assert.assertEquals(key, target.getAndLoadIfAbsent(key));
|
||||
Assert.assertEquals(1L, counter.longValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void evictsLeastRecentlyUsed()
|
||||
{
|
||||
Long start = 1L;
|
||||
Long next = start + SIZE;
|
||||
|
||||
for (long i = start; i < next; i++) {
|
||||
Long key = i;
|
||||
Assert.assertEquals(key, target.getAndLoadIfAbsent(key));
|
||||
}
|
||||
|
||||
Assert.assertEquals(next, target.getAndLoadIfAbsent(next));
|
||||
Assert.assertNull(target.get(start));
|
||||
|
||||
Assert.assertEquals(SIZE + 1, counter.longValue());
|
||||
}
|
||||
}
|
||||
|
||||
public static class Int2IntListLookupTableTest
|
||||
{
|
||||
private IndexedTableJoinMatcher.Int2IntListLookupTable target;
|
||||
|
||||
private AtomicLong counter;
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
counter = new AtomicLong(0);
|
||||
IntFunction<IntList> loader = key -> {
|
||||
counter.incrementAndGet();
|
||||
return createSingletonIntList(key);
|
||||
};
|
||||
|
||||
target = new IndexedTableJoinMatcher.Int2IntListLookupTable(SIZE, loader);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadsValueIfAbsent()
|
||||
{
|
||||
int key = 1;
|
||||
Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key));
|
||||
Assert.assertEquals(1L, counter.longValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void doesNotLoadIfPresent()
|
||||
{
|
||||
int key = 1;
|
||||
Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key));
|
||||
Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key));
|
||||
Assert.assertEquals(1L, counter.longValue());
|
||||
}
|
||||
}
|
||||
|
||||
public static class Int2IntListLruCache
|
||||
{
|
||||
private IndexedTableJoinMatcher.Int2IntListLruCache target;
|
||||
|
||||
private AtomicLong counter;
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
counter = new AtomicLong(0);
|
||||
IntFunction<IntList> loader = key -> {
|
||||
counter.incrementAndGet();
|
||||
return createSingletonIntList(key);
|
||||
};
|
||||
|
||||
target = new IndexedTableJoinMatcher.Int2IntListLruCache(SIZE, loader);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadsValueIfAbsent()
|
||||
{
|
||||
int key = 1;
|
||||
Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key));
|
||||
Assert.assertEquals(1L, counter.longValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void doesNotLoadIfPresent()
|
||||
{
|
||||
int key = 1;
|
||||
Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key));
|
||||
Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key));
|
||||
Assert.assertEquals(1L, counter.longValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void evictsLeastRecentlyUsed()
|
||||
{
|
||||
int start = 1;
|
||||
int next = start + SIZE;
|
||||
|
||||
for (int key = start; key < next; key++) {
|
||||
Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key));
|
||||
}
|
||||
|
||||
Assert.assertEquals(createSingletonIntList(next), target.getAndLoadIfAbsent(next));
|
||||
Assert.assertNull(target.get(start));
|
||||
|
||||
Assert.assertEquals(SIZE + 1, counter.longValue());
|
||||
}
|
||||
}
|
||||
|
||||
private static IntList createSingletonIntList(Object value)
|
||||
{
|
||||
return createSingletonIntList(((String) value).length());
|
||||
}
|
||||
|
||||
private static IntList createSingletonIntList(int value)
|
||||
{
|
||||
return new IntArrayList(Collections.singleton(value));
|
||||
}
|
||||
}
|
|
@ -21,12 +21,14 @@ package org.apache.druid.segment.join.table;
|
|||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
import org.apache.druid.query.InlineDataSource;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.ConstantDimensionSelector;
|
||||
import org.apache.druid.segment.DimensionSelector;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
|
@ -39,12 +41,16 @@ public class IndexedTableJoinableTest
|
|||
{
|
||||
private static final String PREFIX = "j.";
|
||||
|
||||
static {
|
||||
NullHandling.initializeForTests();
|
||||
}
|
||||
|
||||
private final ColumnSelectorFactory dummyColumnSelectorFactory = new ColumnSelectorFactory()
|
||||
{
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
return null;
|
||||
return new ConstantDimensionSelector("dummy");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue