From 0d2b16c1d06a46c4d862f7637f55bfc1dd1c7f41 Mon Sep 17 00:00:00 2001 From: Chi Cao Minh Date: Tue, 4 Feb 2020 17:34:55 -0800 Subject: [PATCH] Speed up joins on indexed tables with string keys (#9278) * Speed up joins on indexed tables with string keys When joining on index tables with string keys, caching the computation of row id to row numbers improves performance on the JoinAndLookupBenchmark.joinIndexTableStringKey* benchmarks by about 10% if the column cache is enabled an by about 100% if the column cache is disabled. * Faster cache impl and handle unknown cardinality * Remove unused dependency * Hoist cardinality check outside of hot loop * Fix dummy DimensionSelector for tests --- .../segment/ConstantDimensionSelector.java | 2 +- .../join/table/IndexedTableJoinMatcher.java | 196 ++++++++++++- .../table/IndexedTableJoinMatcherTest.java | 260 ++++++++++++++++++ .../join/table/IndexedTableJoinableTest.java | 8 +- 4 files changed, 453 insertions(+), 13 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcherTest.java diff --git a/processing/src/main/java/org/apache/druid/segment/ConstantDimensionSelector.java b/processing/src/main/java/org/apache/druid/segment/ConstantDimensionSelector.java index 758aad82824..ff719c4de7a 100644 --- a/processing/src/main/java/org/apache/druid/segment/ConstantDimensionSelector.java +++ b/processing/src/main/java/org/apache/druid/segment/ConstantDimensionSelector.java @@ -35,7 +35,7 @@ public class ConstantDimensionSelector implements SingleValueHistoricalDimension { private final String value; - ConstantDimensionSelector(final String value) + public ConstantDimensionSelector(final String value) { if (NullHandling.isNullOrEquivalent(value)) { // There's an optimized implementation for nulls that callers should use instead. diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java index d86fb5df30b..4f8d6d94f28 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java @@ -19,9 +19,12 @@ package org.apache.druid.segment.join.table; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.ints.IntIterators; +import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntRBTreeSet; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.common.config.NullHandling; @@ -33,6 +36,7 @@ import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnProcessorFactory; import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.IndexedInts; @@ -43,8 +47,12 @@ import org.apache.druid.segment.join.JoinMatcher; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; +import java.util.function.Function; +import java.util.function.IntFunction; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -239,15 +247,47 @@ public class IndexedTableJoinMatcher implements JoinMatcher /** * Makes suppliers that returns the list of IndexedTable rows that match the values from selectors. */ - private static class ConditionMatcherFactory implements ColumnProcessorFactory> + @VisibleForTesting + static class ConditionMatcherFactory implements ColumnProcessorFactory> { + @VisibleForTesting + static final int CACHE_MAX_SIZE = 1000; + + private static final int MAX_NUM_CACHE = 10; + private final ValueType keyType; private final IndexedTable.Index index; + // DimensionSelector -> (int) dimension id -> (IntList) row numbers + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") // updated via computeIfAbsent + private final LruLoadingHashMap dimensionCaches; + ConditionMatcherFactory(ValueType keyType, IndexedTable.Index index) { this.keyType = keyType; this.index = index; + + this.dimensionCaches = new LruLoadingHashMap<>( + MAX_NUM_CACHE, + selector -> { + int cardinality = selector.getValueCardinality(); + IntFunction loader = dimensionId -> getRowNumbers(selector, dimensionId); + return cardinality <= CACHE_MAX_SIZE + ? new Int2IntListLookupTable(cardinality, loader) + : new Int2IntListLruCache(CACHE_MAX_SIZE, loader); + } + ); + } + + private IntList getRowNumbers(DimensionSelector selector, int dimensionId) + { + final String key = selector.lookupName(dimensionId); + return index.find(key); + } + + private IntList getAndCacheRowNumbers(DimensionSelector selector, int dimensionId) + { + return dimensionCaches.getAndLoadIfAbsent(selector).getAndLoadIfAbsent(dimensionId); } @Override @@ -259,17 +299,44 @@ public class IndexedTableJoinMatcher implements JoinMatcher @Override public Supplier makeDimensionProcessor(DimensionSelector selector) { - return () -> { - final IndexedInts row = selector.getRow(); + // NOTE: The slow (cardinality unknown) and fast (cardinality known) code paths below only differ in the calls to + // getRowNumbers() and getAndCacheRowNumbers(), respectively. The majority of the code path is duplicated to avoid + // adding indirection to fetch getRowNumbers()/getAndCacheRowNumbers() from a local BiFunction variable that is + // set outside of the supplier. Minimizing overhead is desirable since the supplier is called from a hot loop for + // joins. - if (row.size() == 1) { - final String key = selector.lookupName(row.get(0)); - return index.find(key).iterator(); - } else { - // Multi-valued rows are not handled by the join system right now; treat them as nulls. - return IntIterators.EMPTY_ITERATOR; - } - }; + if (selector.getValueCardinality() == DimensionDictionarySelector.CARDINALITY_UNKNOWN) { + // If the cardinality is unknown, then the selector does not have a "real" dictionary and the dimension id + // is not valid outside the context of a specific row. This means we cannot use a cache and must fall + // back to this slow code path. + return () -> { + final IndexedInts row = selector.getRow(); + + if (row.size() == 1) { + int dimensionId = row.get(0); + IntList rowNumbers = getRowNumbers(selector, dimensionId); + return rowNumbers.iterator(); + } else { + // Multi-valued rows are not handled by the join system right now; treat them as nulls. + return IntIterators.EMPTY_ITERATOR; + } + }; + } else { + // If the cardinality is known, then the dimension id is still valid outside the context of a specific row and + // its mapping to row numbers can be cached. + return () -> { + final IndexedInts row = selector.getRow(); + + if (row.size() == 1) { + int dimensionId = row.get(0); + IntList rowNumbers = getAndCacheRowNumbers(selector, dimensionId); + return rowNumbers.iterator(); + } else { + // Multi-valued rows are not handled by the join system right now; treat them as nulls. + return IntIterators.EMPTY_ITERATOR; + } + }; + } } @Override @@ -308,4 +375,111 @@ public class IndexedTableJoinMatcher implements JoinMatcher return () -> IntIterators.EMPTY_ITERATOR; } } + + @VisibleForTesting + static class LruLoadingHashMap extends LinkedHashMap + { + private final int maxSize; + private final Function loader; + + LruLoadingHashMap(int maxSize, Function loader) + { + super(capacity(maxSize)); + this.maxSize = maxSize; + this.loader = loader; + } + + V getAndLoadIfAbsent(K key) + { + return computeIfAbsent(key, loader); + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) + { + return size() > maxSize; + } + + private static int capacity(int expectedSize) + { + // This is the calculation used in JDK8 to resize when a putAll happens; it seems to be the most conservative + // calculation we can make. 0.75 is the default load factor. + return (int) ((float) expectedSize / 0.75F + 1.0F); + } + } + + private interface Int2IntListMap + { + IntList getAndLoadIfAbsent(int key); + } + + /** + * Lookup table for keys in the range from 0 to maxSize - 1 + */ + @VisibleForTesting + static class Int2IntListLookupTable implements Int2IntListMap + { + private final IntList[] lookup; + private final IntFunction loader; + + Int2IntListLookupTable(int maxSize, IntFunction loader) + { + this.loader = loader; + this.lookup = new IntList[maxSize]; + } + + @Override + public IntList getAndLoadIfAbsent(int key) + { + IntList value = lookup[key]; + + if (value == null) { + value = loader.apply(key); + lookup[key] = value; + } + + return value; + } + } + + /** + * LRU cache optimized for primitive int keys + */ + @VisibleForTesting + static class Int2IntListLruCache implements Int2IntListMap + { + private final Int2ObjectLinkedOpenHashMap cache; + private final int maxSize; + private final IntFunction loader; + + Int2IntListLruCache(int maxSize, IntFunction loader) + { + this.cache = new Int2ObjectLinkedOpenHashMap<>(maxSize); + this.maxSize = maxSize; + this.loader = loader; + } + + @Override + public IntList getAndLoadIfAbsent(int key) + { + IntList value = cache.getAndMoveToFirst(key); + + if (value == null) { + value = loader.apply(key); + cache.putAndMoveToFirst(key, value); + } + + if (cache.size() > maxSize) { + cache.removeLast(); + } + + return value; + } + + @VisibleForTesting + IntList get(int key) + { + return cache.get(key); + } + } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcherTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcherTest.java new file mode 100644 index 00000000000..64253d66f28 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcherTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.table; + +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntIterator; +import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.ConstantDimensionSelector; +import org.apache.druid.segment.DimensionDictionarySelector; +import org.apache.druid.segment.column.ValueType; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.Supplier; + +@RunWith(Enclosed.class) +public class IndexedTableJoinMatcherTest +{ + private static final int SIZE = 3; + + @RunWith(Enclosed.class) + public static class ConditionMatcherFactoryTest + { + public static class MakeDimensionProcessorTest + { + private static final String KEY = "key"; + + static { + NullHandling.initializeForTests(); + } + + @Test + public void getsCorrectResultWhenSelectorCardinalityUnknown() + { + Supplier target = makeDimensionProcessor(DimensionDictionarySelector.CARDINALITY_UNKNOWN); + Assert.assertEquals(KEY.length(), target.get().nextInt()); + } + + @Test + public void getsCorrectResultWhenSelectorCardinalityLow() + { + int lowCardinality = IndexedTableJoinMatcher.ConditionMatcherFactory.CACHE_MAX_SIZE / 10; + Supplier target = makeDimensionProcessor(lowCardinality); + Assert.assertEquals(KEY.length(), target.get().nextInt()); + } + + @Test + public void getsCorrectResultWhenSelectorCardinalityHigh() + { + int highCardinality = IndexedTableJoinMatcher.ConditionMatcherFactory.CACHE_MAX_SIZE / 10; + Supplier target = makeDimensionProcessor(highCardinality); + Assert.assertEquals(KEY.length(), target.get().nextInt()); + } + + private static Supplier makeDimensionProcessor(int valueCardinality) + { + IndexedTableJoinMatcher.ConditionMatcherFactory conditionMatcherFactory = + new IndexedTableJoinMatcher.ConditionMatcherFactory( + ValueType.STRING, + IndexedTableJoinMatcherTest::createSingletonIntList + ); + return conditionMatcherFactory.makeDimensionProcessor(new TestDimensionSelector(KEY, valueCardinality)); + } + + private static class TestDimensionSelector extends ConstantDimensionSelector + { + private final int valueCardinality; + + TestDimensionSelector(String value, int valueCardinality) + { + super(value); + this.valueCardinality = valueCardinality; + } + + @Override + public int getValueCardinality() + { + return valueCardinality; + } + } + } + } + + public static class LruLoadingHashMapTest + { + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") // updated via computeIfAbsent + private IndexedTableJoinMatcher.LruLoadingHashMap target; + + private AtomicLong counter; + + @Before + public void setup() + { + counter = new AtomicLong(0); + Function loader = key -> { + counter.incrementAndGet(); + return key; + }; + + target = new IndexedTableJoinMatcher.LruLoadingHashMap<>(SIZE, loader); + } + + @Test + public void loadsValueIfAbsent() + { + Long key = 1L; + Assert.assertEquals(key, target.getAndLoadIfAbsent(key)); + Assert.assertEquals(1L, counter.longValue()); + } + + @Test + public void doesNotLoadIfPresent() + { + Long key = 1L; + Assert.assertEquals(key, target.getAndLoadIfAbsent(key)); + Assert.assertEquals(key, target.getAndLoadIfAbsent(key)); + Assert.assertEquals(1L, counter.longValue()); + } + + @Test + public void evictsLeastRecentlyUsed() + { + Long start = 1L; + Long next = start + SIZE; + + for (long i = start; i < next; i++) { + Long key = i; + Assert.assertEquals(key, target.getAndLoadIfAbsent(key)); + } + + Assert.assertEquals(next, target.getAndLoadIfAbsent(next)); + Assert.assertNull(target.get(start)); + + Assert.assertEquals(SIZE + 1, counter.longValue()); + } + } + + public static class Int2IntListLookupTableTest + { + private IndexedTableJoinMatcher.Int2IntListLookupTable target; + + private AtomicLong counter; + + @Before + public void setup() + { + counter = new AtomicLong(0); + IntFunction loader = key -> { + counter.incrementAndGet(); + return createSingletonIntList(key); + }; + + target = new IndexedTableJoinMatcher.Int2IntListLookupTable(SIZE, loader); + } + + @Test + public void loadsValueIfAbsent() + { + int key = 1; + Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key)); + Assert.assertEquals(1L, counter.longValue()); + } + + @Test + public void doesNotLoadIfPresent() + { + int key = 1; + Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key)); + Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key)); + Assert.assertEquals(1L, counter.longValue()); + } + } + + public static class Int2IntListLruCache + { + private IndexedTableJoinMatcher.Int2IntListLruCache target; + + private AtomicLong counter; + + @Before + public void setup() + { + counter = new AtomicLong(0); + IntFunction loader = key -> { + counter.incrementAndGet(); + return createSingletonIntList(key); + }; + + target = new IndexedTableJoinMatcher.Int2IntListLruCache(SIZE, loader); + } + + @Test + public void loadsValueIfAbsent() + { + int key = 1; + Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key)); + Assert.assertEquals(1L, counter.longValue()); + } + + @Test + public void doesNotLoadIfPresent() + { + int key = 1; + Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key)); + Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key)); + Assert.assertEquals(1L, counter.longValue()); + } + + @Test + public void evictsLeastRecentlyUsed() + { + int start = 1; + int next = start + SIZE; + + for (int key = start; key < next; key++) { + Assert.assertEquals(createSingletonIntList(key), target.getAndLoadIfAbsent(key)); + } + + Assert.assertEquals(createSingletonIntList(next), target.getAndLoadIfAbsent(next)); + Assert.assertNull(target.get(start)); + + Assert.assertEquals(SIZE + 1, counter.longValue()); + } + } + + private static IntList createSingletonIntList(Object value) + { + return createSingletonIntList(((String) value).length()); + } + + private static IntList createSingletonIntList(int value) + { + return new IntArrayList(Collections.singleton(value)); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java index 30beafcfc98..a10b699162d 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java @@ -21,12 +21,14 @@ package org.apache.druid.segment.join.table; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.ConstantDimensionSelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; @@ -39,12 +41,16 @@ public class IndexedTableJoinableTest { private static final String PREFIX = "j."; + static { + NullHandling.initializeForTests(); + } + private final ColumnSelectorFactory dummyColumnSelectorFactory = new ColumnSelectorFactory() { @Override public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) { - return null; + return new ConstantDimensionSelector("dummy"); } @Override