From a1494c30e04acca6e5515eacde1a030b57e7b50b Mon Sep 17 00:00:00 2001 From: Chi Cao Minh Date: Wed, 29 Jan 2020 14:08:19 -0800 Subject: [PATCH] Join microbenchmark (#9267) Add microbenchmark for joins. Enabling the column cache improves performance by ~70% for the benchmarks for joins with string keys. Adjusting LookupJoinMatcher.matchCondition() to have fewer branches, improves performance by ~10% for the benchmarks for joins with lookups. --- .../benchmark/JoinAndLookupBenchmark.java | 501 ++++++++++++++++++ .../join/lookup/LookupJoinMatcher.java | 42 +- .../join/table/IndexedTableJoinMatcher.java | 3 +- .../apache/druid/segment/IndexBuilder.java | 14 +- .../org/apache/druid/segment/TestHelper.java | 18 +- 5 files changed, 547 insertions(+), 31 deletions(-) create mode 100644 benchmarks/src/main/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java new file mode 100644 index 00000000000..c57459ce1e2 --- /dev/null +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.expression.LookupExprMacro; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; +import org.apache.druid.query.lookup.MapLookupExtractorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.join.HashJoinSegment; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinTestHelper; +import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.join.JoinableClause; +import org.apache.druid.segment.join.lookup.LookupJoinable; +import org.apache.druid.segment.join.table.IndexedTableJoinable; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.timeline.SegmentId; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 10) +public class JoinAndLookupBenchmark +{ + private static final String LOOKUP_COUNTRY_CODE_TO_NAME = "country_code_to_name"; + private static final String LOOKUP_COUNTRY_NUMBER_TO_NAME = "country_number_to_name"; + + static { + NullHandling.initializeForTests(); + } + + @Param({"500000"}) + int rows; + + @Param({"0", "16384"}) + int columnCacheSizeBytes; + + private File tmpDir = null; + private QueryableIndex index = null; + private Segment baseSegment = null; + private Segment hashJoinLookupStringKeySegment = null; + private Segment hashJoinLookupLongKeySegment = null; + private Segment hashJoinIndexedTableStringKeySegment = null; + private Segment hashJoinIndexedTableLongKeySegment = null; + private VirtualColumns lookupVirtualColumns = null; + + @TearDown + public void tearDown() throws IOException + { + if (index != null) { + index.close(); + } + + if (tmpDir != null) { + FileUtils.deleteDirectory(tmpDir); + } + } + + @Setup() + public void setup() throws IOException + { + tmpDir = FileUtils.createTempDir(); + ColumnConfig columnConfig = () -> columnCacheSizeBytes; + index = JoinTestHelper.createFactIndexBuilder(tmpDir, rows).buildMMappedIndex(columnConfig); + + final String prefix = "c."; + + baseSegment = new QueryableIndexSegment(index, SegmentId.dummy("join")); + + hashJoinLookupStringKeySegment = new HashJoinSegment( + baseSegment, + ImmutableList.of( + new JoinableClause( + prefix, + LookupJoinable.wrap(JoinTestHelper.createCountryIsoCodeToNameLookup()), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format("countryIsoCode == \"%sk\"", prefix), + prefix, + ExprMacroTable.nil() + ) + ) + ) + ); + + hashJoinLookupLongKeySegment = new HashJoinSegment( + baseSegment, + ImmutableList.of( + new JoinableClause( + prefix, + LookupJoinable.wrap(JoinTestHelper.createCountryNumberToNameLookup()), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format("countryNumber == \"%sk\"", prefix), + prefix, + ExprMacroTable.nil() + ) + ) + ) + ); + + hashJoinIndexedTableStringKeySegment = new HashJoinSegment( + baseSegment, + ImmutableList.of( + new JoinableClause( + prefix, + new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format("countryIsoCode == \"%scountryIsoCode\"", prefix), + prefix, + ExprMacroTable.nil() + ) + ) + ) + ); + + hashJoinIndexedTableLongKeySegment = new HashJoinSegment( + baseSegment, + ImmutableList.of( + new JoinableClause( + prefix, + new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format("countryNumber == \"%scountryNumber\"", prefix), + prefix, + ExprMacroTable.nil() + ) + ) + ) + ); + + final Map countryCodeToNameMap = JoinTestHelper.createCountryIsoCodeToNameLookup().getMap(); + final Map countryNumberToNameMap = JoinTestHelper.createCountryNumberToNameLookup().getMap(); + + final ExprMacroTable exprMacroTable = new ExprMacroTable( + ImmutableList.of( + new LookupExprMacro( + lookupName -> { + if (LOOKUP_COUNTRY_CODE_TO_NAME.equals(lookupName)) { + return new LookupExtractorFactoryContainer( + "0", + new MapLookupExtractorFactory(countryCodeToNameMap, false) + ); + } else if (LOOKUP_COUNTRY_NUMBER_TO_NAME.equals(lookupName)) { + return new LookupExtractorFactoryContainer( + "0", + new MapLookupExtractorFactory(countryNumberToNameMap, false) + ); + } else { + return null; + } + } + ) + ) + ); + + lookupVirtualColumns = VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + LOOKUP_COUNTRY_CODE_TO_NAME, + "lookup(countryIsoCode, '" + LOOKUP_COUNTRY_CODE_TO_NAME + "')", + ValueType.STRING, + exprMacroTable + ), + new ExpressionVirtualColumn( + LOOKUP_COUNTRY_NUMBER_TO_NAME, + "lookup(countryNumber, '" + LOOKUP_COUNTRY_NUMBER_TO_NAME + "')", + ValueType.STRING, + exprMacroTable + ) + ) + ); + } + + private static String getLastValue(final Sequence cursors, final String dimension) + { + return cursors.map( + cursor -> { + final DimensionSelector selector = cursor.getColumnSelectorFactory() + .makeDimensionSelector(DefaultDimensionSpec.of(dimension)); + + if (selector.getValueCardinality() < 0) { + String lastValue = null; + while (!cursor.isDone()) { + final IndexedInts row = selector.getRow(); + final int sz = row.size(); + for (int i = 0; i < sz; i++) { + lastValue = selector.lookupName(row.get(i)); + } + cursor.advance(); + } + return lastValue; + } else { + int lastValue = -1; + while (!cursor.isDone()) { + final IndexedInts row = selector.getRow(); + final int sz = row.size(); + for (int i = 0; i < sz; i++) { + lastValue = row.get(i); + } + cursor.advance(); + } + return selector.lookupName(lastValue); + } + } + ).accumulate(null, (acc, in) -> in); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void baseSegment(Blackhole blackhole) + { + final Sequence cursors = baseSegment.asStorageAdapter().makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "countryIsoCode")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void baseSegmentWithFilter(Blackhole blackhole) + { + final Sequence cursors = baseSegment.asStorageAdapter().makeCursors( + new SelectorDimFilter("countryIsoCode", "CA", null).toFilter(), + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "countryIsoCode")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void joinLookupStringKey(Blackhole blackhole) + { + final Sequence cursors = hashJoinLookupStringKeySegment.asStorageAdapter().makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "c.v")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void joinLookupStringKeyWithFilter(Blackhole blackhole) + { + final Sequence cursors = hashJoinLookupStringKeySegment.asStorageAdapter().makeCursors( + new SelectorDimFilter("c.v", "Canada", null).toFilter(), + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "c.v")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void joinLookupLongKey(Blackhole blackhole) + { + final Sequence cursors = hashJoinLookupLongKeySegment.asStorageAdapter().makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "c.v")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void joinLookupLongKeyWithFilter(Blackhole blackhole) + { + final Sequence cursors = hashJoinLookupLongKeySegment.asStorageAdapter().makeCursors( + new SelectorDimFilter("c.v", "Canada", null).toFilter(), + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "c.v")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void joinIndexedTableLongKey(Blackhole blackhole) + { + final Sequence cursors = hashJoinIndexedTableLongKeySegment.asStorageAdapter().makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "c.countryName")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void joinIndexedTableLongKeyWithFilter(Blackhole blackhole) + { + final Sequence cursors = hashJoinIndexedTableLongKeySegment.asStorageAdapter().makeCursors( + new SelectorDimFilter("c.countryName", "Canada", null).toFilter(), + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "c.countryName")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void joinIndexedTableStringKey(Blackhole blackhole) + { + final Sequence cursors = hashJoinIndexedTableStringKeySegment.asStorageAdapter().makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "c.countryName")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void joinIndexedTableStringKeyWithFilter(Blackhole blackhole) + { + final Sequence cursors = hashJoinIndexedTableStringKeySegment.asStorageAdapter().makeCursors( + new SelectorDimFilter("c.countryName", "Canada", null).toFilter(), + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, "c.countryName")); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void lookupVirtualColumnStringKey(Blackhole blackhole) + { + final Sequence cursors = baseSegment.asStorageAdapter().makeCursors( + null, + Intervals.ETERNITY, + lookupVirtualColumns, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, LOOKUP_COUNTRY_CODE_TO_NAME)); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void lookupVirtualColumnStringKeyWithFilter(Blackhole blackhole) + { + final Sequence cursors = baseSegment.asStorageAdapter().makeCursors( + new SelectorDimFilter(LOOKUP_COUNTRY_CODE_TO_NAME, "Canada", null).toFilter(), + Intervals.ETERNITY, + lookupVirtualColumns, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, LOOKUP_COUNTRY_CODE_TO_NAME)); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void lookupVirtualColumnLongKey(Blackhole blackhole) + { + final Sequence cursors = baseSegment.asStorageAdapter().makeCursors( + null, + Intervals.ETERNITY, + lookupVirtualColumns, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, LOOKUP_COUNTRY_NUMBER_TO_NAME)); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void lookupVirtualColumnLongKeyWithFilter(Blackhole blackhole) + { + final Sequence cursors = baseSegment.asStorageAdapter().makeCursors( + new SelectorDimFilter(LOOKUP_COUNTRY_NUMBER_TO_NAME, "Canada", null).toFilter(), + Intervals.ETERNITY, + lookupVirtualColumns, + Granularities.ALL, + false, + null + ); + + blackhole.consume(getLastValue(cursors, LOOKUP_COUNTRY_NUMBER_TO_NAME)); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinMatcher.java b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinMatcher.java index 9af6df93fdc..d435d62d327 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinMatcher.java +++ b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinMatcher.java @@ -227,33 +227,41 @@ public class LookupJoinMatcher implements JoinMatcher return; } + Iterator> keySupplierIterator = keySuppliers.iterator(); + String theKey = keySupplierIterator.next().get(); + + if (theKey == null) { + currentEntry.set(null); + return; + } + // In order to match, all keySuppliers must return the same string, which must be a key in the lookup. - String theKey = null; - - for (Supplier keySupplier : keySuppliers) { - final String key = keySupplier.get(); - - if (key == null || (theKey != null && !theKey.equals(key))) { + while (keySupplierIterator.hasNext()) { + if (!theKey.equals(keySupplierIterator.next().get())) { currentEntry.set(null); return; - } else { - theKey = key; } } // All keySuppliers matched. Check if they are actually in the lookup. - final String theValue = extractor.apply(theKey); + checkInLookup(theKey); + } + } - if (theValue != null) { - assert theKey != null; - currentEntry.set(Pair.of(theKey, theValue)); + private void checkInLookup(String theKey) + { + // All keySuppliers matched. Check if they are actually in the lookup. + final String theValue = extractor.apply(theKey); - if (matchedKeys != null) { - matchedKeys.add(theKey); - } - } else { - currentEntry.set(null); + if (theValue != null) { + assert theKey != null; + currentEntry.set(Pair.of(theKey, theValue)); + + if (matchedKeys != null) { + matchedKeys.add(theKey); } + } else { + currentEntry.set(null); } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java index 16e322045a9..d86fb5df30b 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java @@ -41,6 +41,7 @@ import org.apache.druid.segment.join.JoinConditionAnalysis; import org.apache.druid.segment.join.JoinMatcher; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; @@ -80,7 +81,7 @@ public class IndexedTableJoinMatcher implements JoinMatcher this.conditionMatchers = condition.getEquiConditions() .stream() .map(eq -> makeConditionMatcher(table, leftSelectorFactory, eq)) - .collect(Collectors.toList()); + .collect(Collectors.toCollection(ArrayList::new)); } else { throw new IAE( "Cannot build hash-join matcher on non-equi-join condition: %s", diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java index a47ea001be8..654a71b31dd 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IndexSizeExceededException; @@ -108,14 +109,23 @@ public class IndexBuilder } public QueryableIndex buildMMappedIndex() + { + ColumnConfig noCacheColumnConfig = () -> 0; + return buildMMappedIndex(noCacheColumnConfig); + } + + public QueryableIndex buildMMappedIndex(ColumnConfig columnConfig) { Preconditions.checkNotNull(indexMerger, "indexMerger"); Preconditions.checkNotNull(tmpDir, "tmpDir"); try (final IncrementalIndex incrementalIndex = buildIncrementalIndex()) { - return TestHelper.getTestIndexIO().loadIndex( + return TestHelper.getTestIndexIO(columnConfig).loadIndex( indexMerger.persist( incrementalIndex, - new File(tmpDir, StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))), + new File( + tmpDir, + StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)) + ), indexSpec, null ) diff --git a/processing/src/test/java/org/apache/druid/segment/TestHelper.java b/processing/src/test/java/org/apache/druid/segment/TestHelper.java index 8264767e601..a6eb8b03ce1 100644 --- a/processing/src/test/java/org/apache/druid/segment/TestHelper.java +++ b/processing/src/test/java/org/apache/druid/segment/TestHelper.java @@ -62,17 +62,13 @@ public class TestHelper public static IndexIO getTestIndexIO() { - return new IndexIO( - JSON_MAPPER, - new ColumnConfig() - { - @Override - public int columnCacheSizeBytes() - { - return 0; - } - } - ); + ColumnConfig noCacheColumnConfig = () -> 0; + return getTestIndexIO(noCacheColumnConfig); + } + + public static IndexIO getTestIndexIO(ColumnConfig columnConfig) + { + return new IndexIO(JSON_MAPPER, columnConfig); } public static ObjectMapper makeJsonMapper()