Join microbenchmark (#9267)

Add microbenchmark for joins. Enabling the column cache improves
performance by ~70% for the benchmarks for joins with string keys.
Adjusting LookupJoinMatcher.matchCondition() to have fewer branches,
improves performance by ~10% for the benchmarks for joins with lookups.
This commit is contained in:
Chi Cao Minh 2020-01-29 14:08:19 -08:00 committed by GitHub
parent 303b02eba1
commit a1494c30e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 547 additions and 31 deletions

View File

@ -0,0 +1,501 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.MapLookupExtractorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.join.HashJoinSegment;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.timeline.SegmentId;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 10)
public class JoinAndLookupBenchmark
{
private static final String LOOKUP_COUNTRY_CODE_TO_NAME = "country_code_to_name";
private static final String LOOKUP_COUNTRY_NUMBER_TO_NAME = "country_number_to_name";
static {
NullHandling.initializeForTests();
}
@Param({"500000"})
int rows;
@Param({"0", "16384"})
int columnCacheSizeBytes;
private File tmpDir = null;
private QueryableIndex index = null;
private Segment baseSegment = null;
private Segment hashJoinLookupStringKeySegment = null;
private Segment hashJoinLookupLongKeySegment = null;
private Segment hashJoinIndexedTableStringKeySegment = null;
private Segment hashJoinIndexedTableLongKeySegment = null;
private VirtualColumns lookupVirtualColumns = null;
@TearDown
public void tearDown() throws IOException
{
if (index != null) {
index.close();
}
if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
}
@Setup()
public void setup() throws IOException
{
tmpDir = FileUtils.createTempDir();
ColumnConfig columnConfig = () -> columnCacheSizeBytes;
index = JoinTestHelper.createFactIndexBuilder(tmpDir, rows).buildMMappedIndex(columnConfig);
final String prefix = "c.";
baseSegment = new QueryableIndexSegment(index, SegmentId.dummy("join"));
hashJoinLookupStringKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryIsoCodeToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%sk\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
)
);
hashJoinLookupLongKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryNumberToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryNumber == \"%sk\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
)
);
hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%scountryIsoCode\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
)
);
hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryNumber == \"%scountryNumber\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
)
);
final Map<String, String> countryCodeToNameMap = JoinTestHelper.createCountryIsoCodeToNameLookup().getMap();
final Map<String, String> countryNumberToNameMap = JoinTestHelper.createCountryNumberToNameLookup().getMap();
final ExprMacroTable exprMacroTable = new ExprMacroTable(
ImmutableList.of(
new LookupExprMacro(
lookupName -> {
if (LOOKUP_COUNTRY_CODE_TO_NAME.equals(lookupName)) {
return new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryCodeToNameMap, false)
);
} else if (LOOKUP_COUNTRY_NUMBER_TO_NAME.equals(lookupName)) {
return new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryNumberToNameMap, false)
);
} else {
return null;
}
}
)
)
);
lookupVirtualColumns = VirtualColumns.create(
ImmutableList.of(
new ExpressionVirtualColumn(
LOOKUP_COUNTRY_CODE_TO_NAME,
"lookup(countryIsoCode, '" + LOOKUP_COUNTRY_CODE_TO_NAME + "')",
ValueType.STRING,
exprMacroTable
),
new ExpressionVirtualColumn(
LOOKUP_COUNTRY_NUMBER_TO_NAME,
"lookup(countryNumber, '" + LOOKUP_COUNTRY_NUMBER_TO_NAME + "')",
ValueType.STRING,
exprMacroTable
)
)
);
}
private static String getLastValue(final Sequence<Cursor> cursors, final String dimension)
{
return cursors.map(
cursor -> {
final DimensionSelector selector = cursor.getColumnSelectorFactory()
.makeDimensionSelector(DefaultDimensionSpec.of(dimension));
if (selector.getValueCardinality() < 0) {
String lastValue = null;
while (!cursor.isDone()) {
final IndexedInts row = selector.getRow();
final int sz = row.size();
for (int i = 0; i < sz; i++) {
lastValue = selector.lookupName(row.get(i));
}
cursor.advance();
}
return lastValue;
} else {
int lastValue = -1;
while (!cursor.isDone()) {
final IndexedInts row = selector.getRow();
final int sz = row.size();
for (int i = 0; i < sz; i++) {
lastValue = row.get(i);
}
cursor.advance();
}
return selector.lookupName(lastValue);
}
}
).accumulate(null, (acc, in) -> in);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void baseSegment(Blackhole blackhole)
{
final Sequence<Cursor> cursors = baseSegment.asStorageAdapter().makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "countryIsoCode"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void baseSegmentWithFilter(Blackhole blackhole)
{
final Sequence<Cursor> cursors = baseSegment.asStorageAdapter().makeCursors(
new SelectorDimFilter("countryIsoCode", "CA", null).toFilter(),
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "countryIsoCode"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void joinLookupStringKey(Blackhole blackhole)
{
final Sequence<Cursor> cursors = hashJoinLookupStringKeySegment.asStorageAdapter().makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "c.v"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void joinLookupStringKeyWithFilter(Blackhole blackhole)
{
final Sequence<Cursor> cursors = hashJoinLookupStringKeySegment.asStorageAdapter().makeCursors(
new SelectorDimFilter("c.v", "Canada", null).toFilter(),
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "c.v"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void joinLookupLongKey(Blackhole blackhole)
{
final Sequence<Cursor> cursors = hashJoinLookupLongKeySegment.asStorageAdapter().makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "c.v"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void joinLookupLongKeyWithFilter(Blackhole blackhole)
{
final Sequence<Cursor> cursors = hashJoinLookupLongKeySegment.asStorageAdapter().makeCursors(
new SelectorDimFilter("c.v", "Canada", null).toFilter(),
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "c.v"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void joinIndexedTableLongKey(Blackhole blackhole)
{
final Sequence<Cursor> cursors = hashJoinIndexedTableLongKeySegment.asStorageAdapter().makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "c.countryName"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void joinIndexedTableLongKeyWithFilter(Blackhole blackhole)
{
final Sequence<Cursor> cursors = hashJoinIndexedTableLongKeySegment.asStorageAdapter().makeCursors(
new SelectorDimFilter("c.countryName", "Canada", null).toFilter(),
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "c.countryName"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void joinIndexedTableStringKey(Blackhole blackhole)
{
final Sequence<Cursor> cursors = hashJoinIndexedTableStringKeySegment.asStorageAdapter().makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "c.countryName"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void joinIndexedTableStringKeyWithFilter(Blackhole blackhole)
{
final Sequence<Cursor> cursors = hashJoinIndexedTableStringKeySegment.asStorageAdapter().makeCursors(
new SelectorDimFilter("c.countryName", "Canada", null).toFilter(),
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, "c.countryName"));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void lookupVirtualColumnStringKey(Blackhole blackhole)
{
final Sequence<Cursor> cursors = baseSegment.asStorageAdapter().makeCursors(
null,
Intervals.ETERNITY,
lookupVirtualColumns,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, LOOKUP_COUNTRY_CODE_TO_NAME));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void lookupVirtualColumnStringKeyWithFilter(Blackhole blackhole)
{
final Sequence<Cursor> cursors = baseSegment.asStorageAdapter().makeCursors(
new SelectorDimFilter(LOOKUP_COUNTRY_CODE_TO_NAME, "Canada", null).toFilter(),
Intervals.ETERNITY,
lookupVirtualColumns,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, LOOKUP_COUNTRY_CODE_TO_NAME));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void lookupVirtualColumnLongKey(Blackhole blackhole)
{
final Sequence<Cursor> cursors = baseSegment.asStorageAdapter().makeCursors(
null,
Intervals.ETERNITY,
lookupVirtualColumns,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, LOOKUP_COUNTRY_NUMBER_TO_NAME));
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void lookupVirtualColumnLongKeyWithFilter(Blackhole blackhole)
{
final Sequence<Cursor> cursors = baseSegment.asStorageAdapter().makeCursors(
new SelectorDimFilter(LOOKUP_COUNTRY_NUMBER_TO_NAME, "Canada", null).toFilter(),
Intervals.ETERNITY,
lookupVirtualColumns,
Granularities.ALL,
false,
null
);
blackhole.consume(getLastValue(cursors, LOOKUP_COUNTRY_NUMBER_TO_NAME));
}
}

View File

@ -227,33 +227,41 @@ public class LookupJoinMatcher implements JoinMatcher
return;
}
Iterator<Supplier<String>> keySupplierIterator = keySuppliers.iterator();
String theKey = keySupplierIterator.next().get();
if (theKey == null) {
currentEntry.set(null);
return;
}
// In order to match, all keySuppliers must return the same string, which must be a key in the lookup.
String theKey = null;
for (Supplier<String> keySupplier : keySuppliers) {
final String key = keySupplier.get();
if (key == null || (theKey != null && !theKey.equals(key))) {
while (keySupplierIterator.hasNext()) {
if (!theKey.equals(keySupplierIterator.next().get())) {
currentEntry.set(null);
return;
} else {
theKey = key;
}
}
// All keySuppliers matched. Check if they are actually in the lookup.
final String theValue = extractor.apply(theKey);
checkInLookup(theKey);
}
}
if (theValue != null) {
assert theKey != null;
currentEntry.set(Pair.of(theKey, theValue));
private void checkInLookup(String theKey)
{
// All keySuppliers matched. Check if they are actually in the lookup.
final String theValue = extractor.apply(theKey);
if (matchedKeys != null) {
matchedKeys.add(theKey);
}
} else {
currentEntry.set(null);
if (theValue != null) {
assert theKey != null;
currentEntry.set(Pair.of(theKey, theValue));
if (matchedKeys != null) {
matchedKeys.add(theKey);
}
} else {
currentEntry.set(null);
}
}

View File

@ -41,6 +41,7 @@ import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinMatcher;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
@ -80,7 +81,7 @@ public class IndexedTableJoinMatcher implements JoinMatcher
this.conditionMatchers = condition.getEquiConditions()
.stream()
.map(eq -> makeConditionMatcher(table, leftSelectorFactory, eq))
.collect(Collectors.toList());
.collect(Collectors.toCollection(ArrayList::new));
} else {
throw new IAE(
"Cannot build hash-join matcher on non-equi-join condition: %s",

View File

@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
@ -108,14 +109,23 @@ public class IndexBuilder
}
public QueryableIndex buildMMappedIndex()
{
ColumnConfig noCacheColumnConfig = () -> 0;
return buildMMappedIndex(noCacheColumnConfig);
}
public QueryableIndex buildMMappedIndex(ColumnConfig columnConfig)
{
Preconditions.checkNotNull(indexMerger, "indexMerger");
Preconditions.checkNotNull(tmpDir, "tmpDir");
try (final IncrementalIndex incrementalIndex = buildIncrementalIndex()) {
return TestHelper.getTestIndexIO().loadIndex(
return TestHelper.getTestIndexIO(columnConfig).loadIndex(
indexMerger.persist(
incrementalIndex,
new File(tmpDir, StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))),
new File(
tmpDir,
StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))
),
indexSpec,
null
)

View File

@ -62,17 +62,13 @@ public class TestHelper
public static IndexIO getTestIndexIO()
{
return new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
ColumnConfig noCacheColumnConfig = () -> 0;
return getTestIndexIO(noCacheColumnConfig);
}
public static IndexIO getTestIndexIO(ColumnConfig columnConfig)
{
return new IndexIO(JSON_MAPPER, columnConfig);
}
public static ObjectMapper makeJsonMapper()