diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/timecompare/TimeCompareBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/timecompare/TimeCompareBenchmark.java new file mode 100644 index 00000000000..97522d5d921 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/query/timecompare/TimeCompareBenchmark.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.query.timecompare; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import io.druid.benchmark.datagen.BenchmarkDataGenerator; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.benchmark.query.QueryBenchmarkUtil; +import io.druid.collections.StupidPool; +import io.druid.data.input.InputRow; +import io.druid.hll.HyperLogLogHash; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.logger.Logger; +import io.druid.math.expr.ExprMacroTable; +import io.druid.offheap.OffheapBufferGenerator; +import io.druid.query.Druids; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.PerSegmentOptimizingQueryRunner; +import io.druid.query.PerSegmentQueryOptimizationContext; +import io.druid.query.Query; +import io.druid.query.QueryPlus; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.query.Result; +import io.druid.query.SegmentDescriptor; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.FilteredAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.query.filter.IntervalDimFilter; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.query.topn.TopNQueryBuilder; +import io.druid.query.topn.TopNQueryConfig; +import io.druid.query.topn.TopNQueryQueryToolChest; +import io.druid.query.topn.TopNQueryRunnerFactory; +import io.druid.query.topn.TopNResultValue; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.serde.ComplexMetrics; +import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.commons.io.FileUtils; +import org.joda.time.Interval; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 50) +@Measurement(iterations = 200) +public class TimeCompareBenchmark +{ + @Param({"10"}) + private int numSegments; + + @Param({"100000"}) + private int rowsPerSegment; + + @Param({"100"}) + private int threshold; + + protected static final Map scriptDoubleSum = Maps.newHashMap(); + static { + scriptDoubleSum.put("fnAggregate", "function aggregate(current, a) { return current + a }"); + scriptDoubleSum.put("fnReset", "function reset() { return 0 }"); + scriptDoubleSum.put("fnCombine", "function combine(a,b) { return a + b }"); + } + + private static final Logger log = new Logger(TimeCompareBenchmark.class); + private static final int RNG_SEED = 9999; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + private List incIndexes; + private List qIndexes; + + private QueryRunnerFactory topNFactory; + private Query topNQuery; + private QueryRunner topNRunner; + + + private QueryRunnerFactory timeseriesFactory; + private Query timeseriesQuery; + private QueryRunner timeseriesRunner; + + private BenchmarkSchemaInfo schemaInfo; + private File tmpDir; + private Interval[] segmentIntervals; + + private ExecutorService executorService; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue(ExprMacroTable.class, ExprMacroTable.nil()); + JSON_MAPPER.setInjectableValues(injectableValues); + + INDEX_IO = new IndexIO( + JSON_MAPPER, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + } + + private static final Map> SCHEMA_QUERY_MAP = new LinkedHashMap<>(); + + private void setupQueries() + { + // queries for the basic schema + Map basicQueries = new LinkedHashMap<>(); + BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic"); + + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval())); + + long startMillis = basicSchema.getDataInterval().getStartMillis(); + long endMillis = basicSchema.getDataInterval().getEndMillis(); + long half = (endMillis - startMillis) / 2; + + Interval recent = Intervals.utc(half, endMillis); + Interval previous = Intervals.utc(startMillis, half); + + log.info("Recent interval: " + recent); + log.info("Previous interval: " + previous); + + { // basic.topNTimeCompare + List queryAggs = new ArrayList<>(); + queryAggs.add( + new FilteredAggregatorFactory( + //jsAgg1, + new LongSumAggregatorFactory( + "sumLongSequential", "sumLongSequential" + ), + new IntervalDimFilter( + Column.TIME_COLUMN_NAME, + Collections.singletonList(recent), + null + ) + ) + ); + queryAggs.add( + new FilteredAggregatorFactory( + new LongSumAggregatorFactory( + "_cmp_sumLongSequential", "sumLongSequential" + ), + new IntervalDimFilter( + Column.TIME_COLUMN_NAME, + Collections.singletonList(previous), + null + ) + ) + ); + + TopNQueryBuilder queryBuilderA = new TopNQueryBuilder() + .dataSource("blah") + .granularity(Granularities.ALL) + .dimension("dimUniform") + .metric("sumLongSequential") + .intervals(intervalSpec) + .aggregators(queryAggs) + .threshold(threshold); + + topNQuery = queryBuilderA.build(); + topNFactory = new TopNQueryRunnerFactory( + new StupidPool<>( + "TopNBenchmark-compute-bufferPool", + new OffheapBufferGenerator("compute", 250000000), + 0, + Integer.MAX_VALUE + ), + new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()), + QueryBenchmarkUtil.NOOP_QUERYWATCHER + ); + + basicQueries.put("topNTimeCompare", queryBuilderA); + } + { // basic.timeseriesTimeCompare + List queryAggs = new ArrayList<>(); + queryAggs.add( + new FilteredAggregatorFactory( + new LongSumAggregatorFactory( + "sumLongSequential", "sumLongSequential" + ), + new IntervalDimFilter( + Column.TIME_COLUMN_NAME, + Collections.singletonList(recent), + null + ) + ) + ); + queryAggs.add( + new FilteredAggregatorFactory( + new LongSumAggregatorFactory( + "_cmp_sumLongSequential", "sumLongSequential" + ), + new IntervalDimFilter( + Column.TIME_COLUMN_NAME, + Collections.singletonList(previous), + null + ) + ) + ); + + Druids.TimeseriesQueryBuilder timeseriesQueryBuilder = Druids.newTimeseriesQueryBuilder() + .dataSource("blah") + .granularity(Granularities.ALL) + .intervals(intervalSpec) + .aggregators(queryAggs) + .descending(false); + + timeseriesQuery = timeseriesQueryBuilder.build(); + timeseriesFactory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest( + QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() + ), + new TimeseriesQueryEngine(), + QueryBenchmarkUtil.NOOP_QUERYWATCHER + ); + } + + SCHEMA_QUERY_MAP.put("basic", basicQueries); + } + + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + System.currentTimeMillis()); + + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault())); + } + + executorService = Execs.multiThreaded(numSegments, "TopNThreadPool"); + + setupQueries(); + + String schemaName = "basic"; + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName); + segmentIntervals = new Interval[numSegments]; + + long startMillis = schemaInfo.getDataInterval().getStartMillis(); + long endMillis = schemaInfo.getDataInterval().getEndMillis(); + long partialIntervalMillis = (endMillis - startMillis) / numSegments; + for (int i = 0; i < numSegments; i++) { + long partialEndMillis = startMillis + partialIntervalMillis; + segmentIntervals[i] = Intervals.utc(startMillis, partialEndMillis); + log.info("Segment [%d] with interval [%s]", i, segmentIntervals[i]); + startMillis = partialEndMillis; + } + + incIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + log.info("Generating rows for segment " + i); + + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + i, + segmentIntervals[i], + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + incIndexes.add(incIndex); + } + + tmpDir = Files.createTempDir(); + log.info("Using temp dir: " + tmpDir.getAbsolutePath()); + + qIndexes = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + File indexFile = INDEX_MERGER_V9.persist( + incIndexes.get(i), + tmpDir, + new IndexSpec(), + null + ); + + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + qIndexes.add(qIndex); + } + + List>> singleSegmentRunners = Lists.newArrayList(); + QueryToolChest toolChest = topNFactory.getToolchest(); + for (int i = 0; i < numSegments; i++) { + String segmentName = "qIndex" + i; + QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + topNFactory, + segmentName, + new QueryableIndexSegment(segmentName, qIndexes.get(i)) + ); + singleSegmentRunners.add( + new PerSegmentOptimizingQueryRunner<>( + toolChest.preMergeQueryDecoration(runner), + new PerSegmentQueryOptimizationContext( + new SegmentDescriptor(segmentIntervals[i], "1", 0) + ) + ) + ); + } + + topNRunner = toolChest.postMergeQueryDecoration( + new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(topNFactory.mergeRunners(executorService, singleSegmentRunners)), + toolChest + ) + ); + + List>> singleSegmentRunnersT = Lists.newArrayList(); + QueryToolChest toolChestT = timeseriesFactory.getToolchest(); + for (int i = 0; i < numSegments; i++) { + String segmentName = "qIndex" + i; + QueryRunner> runner = QueryBenchmarkUtil.makeQueryRunner( + timeseriesFactory, + segmentName, + new QueryableIndexSegment(segmentName, qIndexes.get(i)) + ); + singleSegmentRunnersT.add( + new PerSegmentOptimizingQueryRunner<>( + toolChestT.preMergeQueryDecoration(runner), + new PerSegmentQueryOptimizationContext( + new SegmentDescriptor(segmentIntervals[i], "1", 0) + ) + ) + ); + } + + timeseriesRunner = toolChestT.postMergeQueryDecoration( + new FinalizeResultsQueryRunner<>( + toolChestT.mergeResults(timeseriesFactory.mergeRunners(executorService, singleSegmentRunnersT)), + toolChestT + ) + ); + } + + @TearDown + public void tearDown() throws IOException + { + FileUtils.deleteDirectory(tmpDir); + } + + private IncrementalIndex makeIncIndex() + { + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndexTopN(Blackhole blackhole) + { + Sequence> queryResult = topNRunner.run( + QueryPlus.wrap(topNQuery), + Maps.newHashMap() + ); + List> results = queryResult.toList(); + + for (Result result : results) { + blackhole.consume(result); + } + } + + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndexTimeseries(Blackhole blackhole) + { + Sequence> queryResult = timeseriesRunner.run( + QueryPlus.wrap(timeseriesQuery), + Maps.newHashMap() + ); + List> results = queryResult.toList(); + + for (Result result : results) { + blackhole.consume(result); + } + } +} diff --git a/processing/src/main/java/io/druid/query/PerSegmentOptimizingQueryRunner.java b/processing/src/main/java/io/druid/query/PerSegmentOptimizingQueryRunner.java new file mode 100644 index 00000000000..11e7f2f9e99 --- /dev/null +++ b/processing/src/main/java/io/druid/query/PerSegmentOptimizingQueryRunner.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import io.druid.java.util.common.guava.Sequence; + +import java.util.Map; + +/** + * This runner optimizes queries made on a single segment, using per-segment information, + * before submitting the queries to the base runner. + * + * Example optimizations include adjusting query filters based on per-segment information, such as intervals. + * + * This query runner should only wrap base query runners that will + * be used to query a single segment (i.e., when the query reaches a historical node). + * + * @param + */ +public class PerSegmentOptimizingQueryRunner implements QueryRunner +{ + private final QueryRunner base; + private final PerSegmentQueryOptimizationContext optimizationContext; + + public PerSegmentOptimizingQueryRunner( + QueryRunner base, + PerSegmentQueryOptimizationContext optimizationContext + ) + { + this.base = base; + this.optimizationContext = optimizationContext; + } + + @Override + public Sequence run(final QueryPlus input, final Map responseContext) + { + return base.run( + input.optimizeForSegment(optimizationContext), + responseContext + ); + } +} diff --git a/processing/src/main/java/io/druid/query/PerSegmentQueryOptimizationContext.java b/processing/src/main/java/io/druid/query/PerSegmentQueryOptimizationContext.java new file mode 100644 index 00000000000..993ccf13ce3 --- /dev/null +++ b/processing/src/main/java/io/druid/query/PerSegmentQueryOptimizationContext.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +/** + * Holds information about a single segment that Query objects can use to optimize themselves + * when they are run on that single segment. + * + * @see PerSegmentOptimizingQueryRunner + */ +public class PerSegmentQueryOptimizationContext +{ + private final SegmentDescriptor segmentDescriptor; + + public PerSegmentQueryOptimizationContext( + SegmentDescriptor segmentDescriptor + ) + { + this.segmentDescriptor = segmentDescriptor; + } + + public SegmentDescriptor getSegmentDescriptor() + { + return segmentDescriptor; + } +} diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 6e8e69eabd3..5212bd5053f 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -109,4 +109,9 @@ public interface Query String getId(); Query withDataSource(DataSource dataSource); + + default Query optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext) + { + return this; + } } diff --git a/processing/src/main/java/io/druid/query/QueryPlus.java b/processing/src/main/java/io/druid/query/QueryPlus.java index f9a33b38c27..c8110bbfc7f 100644 --- a/processing/src/main/java/io/druid/query/QueryPlus.java +++ b/processing/src/main/java/io/druid/query/QueryPlus.java @@ -144,4 +144,9 @@ public final class QueryPlus { return query.getRunner(walker).run(this, context); } + + public QueryPlus optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext) + { + return new QueryPlus<>(query.optimizeForSegment(optimizationContext), queryMetrics, identity); + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java index 65858c5da75..22ec0724719 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java @@ -23,6 +23,7 @@ import io.druid.guice.annotations.ExtensionPoint; import io.druid.java.util.common.Cacheable; import io.druid.java.util.common.UOE; import io.druid.java.util.common.logger.Logger; +import io.druid.query.PerSegmentQueryOptimizationContext; import io.druid.segment.ColumnSelectorFactory; import javax.annotation.Nullable; @@ -141,6 +142,14 @@ public abstract class AggregatorFactory implements Cacheable */ public abstract int getMaxIntermediateSize(); + /** + * Return a potentially optimized form of this AggregatorFactory for per-segment queries. + */ + public AggregatorFactory optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext) + { + return this; + } + /** * Merges the list of AggregatorFactory[] (presumable from metadata of some segments being merged) and * returns merged AggregatorFactory[] (for the metadata for merged segment). diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java index eedd09580de..8f7910d275e 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java @@ -98,6 +98,9 @@ public class AggregatorUtil public static final byte STRING_FIRST_CACHE_TYPE_ID = 0x2B; public static final byte STRING_LAST_CACHE_TYPE_ID = 0x2C; + // Suppressed aggregator + public static final byte SUPPRESSED_AGG_CACHE_TYPE_ID = 0x2D; + /** * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg * diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java index 686b7934532..c3dfdc4d750 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java @@ -21,12 +21,17 @@ package io.druid.query.aggregation; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import io.druid.query.PerSegmentQueryOptimizationContext; import io.druid.query.filter.DimFilter; +import io.druid.query.filter.IntervalDimFilter; import io.druid.query.filter.ValueMatcher; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.column.Column; import io.druid.segment.filter.Filters; +import org.joda.time.Interval; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -140,6 +145,66 @@ public class FilteredAggregatorFactory extends AggregatorFactory return delegate.getMaxIntermediateSize(); } + @Override + public AggregatorFactory optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext) + { + if (filter instanceof IntervalDimFilter) { + IntervalDimFilter intervalDimFilter = ((IntervalDimFilter) filter); + if (intervalDimFilter.getExtractionFn() != null) { + // no support for extraction functions right now + return this; + } + + if (!intervalDimFilter.getDimension().equals(Column.TIME_COLUMN_NAME)) { + // segment time boundary optimization only applies when we filter on __time + return this; + } + + Interval segmentInterval = optimizationContext.getSegmentDescriptor().getInterval(); + List filterIntervals = intervalDimFilter.getIntervals(); + List excludedFilterIntervals = new ArrayList<>(); + List effectiveFilterIntervals = new ArrayList<>(); + + boolean segmentIsCovered = false; + for (Interval filterInterval : filterIntervals) { + Interval overlap = filterInterval.overlap(segmentInterval); + if (overlap == null) { + excludedFilterIntervals.add(filterInterval); + continue; + } + + if (overlap.equals(segmentInterval)) { + segmentIsCovered = true; + break; + } else { + // clip the overlapping interval to the segment time boundaries + effectiveFilterIntervals.add(overlap); + } + } + + // we can skip applying this filter, everything in the segment will match + if (segmentIsCovered) { + return delegate; + } + + // we can skip this filter, nothing in the segment would match + if (excludedFilterIntervals.size() == filterIntervals.size()) { + return new SuppressedAggregatorFactory(delegate); + } + + return new FilteredAggregatorFactory( + delegate, + new IntervalDimFilter( + intervalDimFilter.getDimension(), + effectiveFilterIntervals, + intervalDimFilter.getExtractionFn() + ) + ); + } else { + return this; + } + } + @JsonProperty public AggregatorFactory getAggregator() { diff --git a/processing/src/main/java/io/druid/query/aggregation/SuppressedAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/SuppressedAggregatorFactory.java new file mode 100644 index 00000000000..1d393ce0eda --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/SuppressedAggregatorFactory.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.query.PerSegmentQueryOptimizationContext; +import io.druid.query.cache.CacheKeyBuilder; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.ColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * This AggregatorFactory is meant for wrapping delegate aggregators for optimization purposes. + * + * The wrapper suppresses the aggregate() method for the underlying delegate, while leaving + * the behavior of other calls unchanged. + * + * This wrapper is meant to be used when an optimization decides that an aggregator can be entirely skipped + * (e.g., a FilteredAggregatorFactory where the filter condition will never match). + */ +public class SuppressedAggregatorFactory extends AggregatorFactory +{ + private final AggregatorFactory delegate; + + public SuppressedAggregatorFactory( + AggregatorFactory delegate + ) + { + this.delegate = delegate; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new SuppressedAggregator(delegate.factorize(metricFactory)); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new SuppressedBufferAggregator(delegate.factorizeBuffered(metricFactory)); + } + + @Override + public Comparator getComparator() + { + return delegate.getComparator(); + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return delegate.combine(lhs, rhs); + } + + @Override + public AggregateCombiner makeAggregateCombiner() + { + return delegate.makeAggregateCombiner(); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return delegate.getCombiningFactory(); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + return delegate.getMergingFactory(other); + } + + @Override + public List getRequiredColumns() + { + return delegate.getRequiredColumns(); + } + + @Override + public Object deserialize(Object object) + { + return delegate.deserialize(object); + } + + @Override + public Object finalizeComputation(Object object) + { + return delegate.finalizeComputation(object); + } + + @Override + public String getName() + { + return delegate.getName(); + } + + @Override + public List requiredFields() + { + return delegate.requiredFields(); + } + + @Override + public String getTypeName() + { + return delegate.getTypeName(); + } + + @Override + public int getMaxIntermediateSize() + { + return delegate.getMaxIntermediateSize(); + } + + @Override + public AggregatorFactory optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext) + { + // we are already the result of an optimizeForSegment() call + return this; + } + + @Override + public byte[] getCacheKey() + { + CacheKeyBuilder cacheKeyBuilder = new CacheKeyBuilder(AggregatorUtil.SUPPRESSED_AGG_CACHE_TYPE_ID); + cacheKeyBuilder.appendCacheable(delegate); + return cacheKeyBuilder.build(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SuppressedAggregatorFactory that = (SuppressedAggregatorFactory) o; + return Objects.equals(getDelegate(), that.getDelegate()); + } + + @Override + public int hashCode() + { + return Objects.hash(getDelegate()); + } + + @Override + public String toString() + { + return "SuppressedAggregatorFactory{" + + "delegate=" + delegate + + '}'; + } + + public AggregatorFactory getDelegate() + { + return delegate; + } + + public static class SuppressedAggregator implements Aggregator + { + private final Aggregator delegate; + + public SuppressedAggregator( + Aggregator delegate + ) + { + this.delegate = delegate; + } + + @Override + public void aggregate() + { + //no-op + } + + @Nullable + @Override + public Object get() + { + return delegate.get(); + } + + @Override + public float getFloat() + { + return delegate.getFloat(); + } + + @Override + public long getLong() + { + return delegate.getLong(); + } + + @Override + public double getDouble() + { + return delegate.getDouble(); + } + + @Override + public boolean isNull() + { + return delegate.isNull(); + } + + @Override + public void close() + { + delegate.close(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SuppressedAggregator that = (SuppressedAggregator) o; + return Objects.equals(getDelegate(), that.getDelegate()); + } + + @Override + public int hashCode() + { + return Objects.hash(getDelegate()); + } + + @Override + public String toString() + { + return "SuppressedAggregator{" + + "delegate=" + delegate + + '}'; + } + + public Aggregator getDelegate() + { + return delegate; + } + } + + public static class SuppressedBufferAggregator implements BufferAggregator + { + private final BufferAggregator delegate; + + public SuppressedBufferAggregator( + BufferAggregator delegate + ) + { + this.delegate = delegate; + } + + @Override + public void init(ByteBuffer buf, int position) + { + delegate.init(buf, position); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + // no-op + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return delegate.get(buf, position); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + return delegate.getFloat(buf, position); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + return delegate.getLong(buf, position); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + return delegate.getDouble(buf, position); + } + + @Override + public void close() + { + delegate.close(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + delegate.inspectRuntimeShape(inspector); + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + delegate.relocate(oldPosition, newPosition, oldBuffer, newBuffer); + } + + @Override + public boolean isNull(ByteBuffer buf, int position) + { + return delegate.isNull(buf, position); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SuppressedBufferAggregator that = (SuppressedBufferAggregator) o; + return Objects.equals(getDelegate(), that.getDelegate()); + } + + @Override + public int hashCode() + { + return Objects.hash(getDelegate()); + } + + @Override + public String toString() + { + return "SuppressedBufferAggregator{" + + "delegate=" + delegate + + '}'; + } + + public BufferAggregator getDelegate() + { + return delegate; + } + } +} diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index 2fac18d5669..dcbb03f1372 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -28,6 +28,7 @@ import io.druid.java.util.common.granularity.Granularity; import io.druid.query.BaseQuery; import io.druid.query.DataSource; import io.druid.query.Druids; +import io.druid.query.PerSegmentQueryOptimizationContext; import io.druid.query.Queries; import io.druid.query.Query; import io.druid.query.Result; @@ -37,6 +38,7 @@ import io.druid.query.filter.DimFilter; import io.druid.query.spec.QuerySegmentSpec; import io.druid.segment.VirtualColumns; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -153,6 +155,12 @@ public class TimeseriesQuery extends BaseQuery> return Druids.TimeseriesQueryBuilder.copy(this).dataSource(dataSource).build(); } + @Override + public Query> optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext) + { + return Druids.TimeseriesQueryBuilder.copy(this).aggregators(optimizeAggs(optimizationContext)).build(); + } + @Override public TimeseriesQuery withOverriddenContext(Map contextOverrides) { @@ -170,6 +178,15 @@ public class TimeseriesQuery extends BaseQuery> return Druids.TimeseriesQueryBuilder.copy(this).postAggregators(postAggregatorSpecs).build(); } + private List optimizeAggs(PerSegmentQueryOptimizationContext optimizationContext) + { + List optimizedAggs = new ArrayList<>(); + for (AggregatorFactory aggregatorFactory : aggregatorSpecs) { + optimizedAggs.add(aggregatorFactory.optimizeForSegment(optimizationContext)); + } + return optimizedAggs; + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java index 3726b7b196f..6897cc5fba3 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.BaseQuery; import io.druid.query.DataSource; +import io.druid.query.PerSegmentQueryOptimizationContext; import io.druid.query.Queries; import io.druid.query.Query; import io.druid.query.Result; @@ -36,6 +37,7 @@ import io.druid.query.filter.DimFilter; import io.druid.query.spec.QuerySegmentSpec; import io.druid.segment.VirtualColumns; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -183,6 +185,12 @@ public class TopNQuery extends BaseQuery> return new TopNQueryBuilder(this).dataSource(dataSource).build(); } + @Override + public Query> optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext) + { + return new TopNQueryBuilder(this).aggregators(optimizeAggs(optimizationContext)).build(); + } + public TopNQuery withThreshold(int threshold) { return new TopNQueryBuilder(this).threshold(threshold).build(); @@ -252,4 +260,13 @@ public class TopNQuery extends BaseQuery> postAggregatorSpecs ); } + + private List optimizeAggs(PerSegmentQueryOptimizationContext optimizationContext) + { + List optimizedAggs = new ArrayList<>(); + for (AggregatorFactory aggregatorFactory : aggregatorSpecs) { + optimizedAggs.add(aggregatorFactory.optimizeForSegment(optimizationContext)); + } + return optimizedAggs; + } } diff --git a/processing/src/test/java/io/druid/query/topn/PerSegmentQueryOptimizeTest.java b/processing/src/test/java/io/druid/query/topn/PerSegmentQueryOptimizeTest.java new file mode 100644 index 00000000000..06a7f46fa24 --- /dev/null +++ b/processing/src/test/java/io/druid/query/topn/PerSegmentQueryOptimizeTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn; + +import io.druid.java.util.common.Intervals; +import io.druid.query.PerSegmentQueryOptimizationContext; +import io.druid.query.SegmentDescriptor; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.FilteredAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.SuppressedAggregatorFactory; +import io.druid.query.filter.IntervalDimFilter; +import io.druid.segment.column.Column; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class PerSegmentQueryOptimizeTest +{ + @Test + public void testFilteredAggregatorOptimize() + { + LongSumAggregatorFactory longSumAggregatorFactory = new LongSumAggregatorFactory("test", "test"); + + FilteredAggregatorFactory aggregatorFactory = new FilteredAggregatorFactory( + longSumAggregatorFactory, + new IntervalDimFilter( + Column.TIME_COLUMN_NAME, + Collections.singletonList(Intervals.utc(1000, 2000)), + null + ) + ); + + Interval exclude = Intervals.utc(2000, 3000); + Interval include = Intervals.utc(1500, 1600); + Interval partial = Intervals.utc(1500, 2500); + + AggregatorFactory excludedAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(exclude)); + AggregatorFactory expectedSuppressedAgg = new SuppressedAggregatorFactory(longSumAggregatorFactory); + Assert.assertEquals(expectedSuppressedAgg, excludedAgg); + + AggregatorFactory includedAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(include)); + Assert.assertEquals(longSumAggregatorFactory, includedAgg); + + AggregatorFactory partialAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(partial)); + AggregatorFactory expectedPartialFilteredAgg = new FilteredAggregatorFactory( + longSumAggregatorFactory, + new IntervalDimFilter( + Column.TIME_COLUMN_NAME, + Collections.singletonList(Intervals.utc(1500, 2000)), + null + ) + ); + Assert.assertEquals(expectedPartialFilteredAgg, partialAgg); + } + + @Test + public void testFilteredAggregatorDontOptimizeOnNonTimeColumn() + { + // Filter is not on __time, so no optimizations should be made. + LongSumAggregatorFactory longSumAggregatorFactory = new LongSumAggregatorFactory("test", "test"); + + FilteredAggregatorFactory aggregatorFactory = new FilteredAggregatorFactory( + longSumAggregatorFactory, + new IntervalDimFilter( + "not_time", + Collections.singletonList(Intervals.utc(1000, 2000)), + null + ) + ); + + Interval exclude = Intervals.utc(2000, 3000); + Interval include = Intervals.utc(1500, 1600); + Interval partial = Intervals.utc(1500, 2500); + + AggregatorFactory excludedAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(exclude)); + Assert.assertEquals(aggregatorFactory, excludedAgg); + + AggregatorFactory includedAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(include)); + Assert.assertEquals(aggregatorFactory, includedAgg); + + AggregatorFactory partialAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(partial)); + Assert.assertEquals(aggregatorFactory, partialAgg); + } + + private PerSegmentQueryOptimizationContext getOptimizationContext(Interval segmentInterval) + { + return new PerSegmentQueryOptimizationContext( + new SegmentDescriptor(segmentInterval, "0", 0) + ); + } +} diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 0a633036742..aac2e860234 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -39,6 +39,8 @@ import io.druid.query.DataSource; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.NoopQueryRunner; +import io.druid.query.PerSegmentOptimizingQueryRunner; +import io.druid.query.PerSegmentQueryOptimizationContext; import io.druid.query.Query; import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; @@ -280,38 +282,54 @@ public class ServerManager implements QuerySegmentWalker { SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor); String segmentId = adapter.getIdentifier(); + + MetricsEmittingQueryRunner metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>( + emitter, + toolChest, + new ReferenceCountingSegmentQueryRunner<>(factory, adapter, segmentDescriptor), + QueryMetrics::reportSegmentTime, + queryMetrics -> queryMetrics.segment(segmentId) + ); + + CachingQueryRunner cachingQueryRunner = new CachingQueryRunner<>( + segmentId, + segmentDescriptor, + objectMapper, + cache, + toolChest, + metricsEmittingQueryRunnerInner, + cachingExec, + cacheConfig + ); + + BySegmentQueryRunner bySegmentQueryRunner = new BySegmentQueryRunner<>( + segmentId, + adapter.getDataInterval().getStart(), + cachingQueryRunner + ); + + MetricsEmittingQueryRunner metricsEmittingQueryRunnerOuter = new MetricsEmittingQueryRunner<>( + emitter, + toolChest, + bySegmentQueryRunner, + QueryMetrics::reportSegmentAndCacheTime, + queryMetrics -> queryMetrics.segment(segmentId) + ).withWaitMeasuredFromNow(); + + SpecificSegmentQueryRunner specificSegmentQueryRunner = new SpecificSegmentQueryRunner<>( + metricsEmittingQueryRunnerOuter, + segmentSpec + ); + + PerSegmentOptimizingQueryRunner perSegmentOptimizingQueryRunner = new PerSegmentOptimizingQueryRunner<>( + specificSegmentQueryRunner, + new PerSegmentQueryOptimizationContext(segmentDescriptor) + ); + return new SetAndVerifyContextQueryRunner<>( serverConfig, CPUTimeMetricQueryRunner.safeBuild( - new SpecificSegmentQueryRunner<>( - new MetricsEmittingQueryRunner<>( - emitter, - toolChest, - new BySegmentQueryRunner<>( - segmentId, - adapter.getDataInterval().getStart(), - new CachingQueryRunner<>( - segmentId, - segmentDescriptor, - objectMapper, - cache, - toolChest, - new MetricsEmittingQueryRunner<>( - emitter, - toolChest, - new ReferenceCountingSegmentQueryRunner<>(factory, adapter, segmentDescriptor), - QueryMetrics::reportSegmentTime, - queryMetrics -> queryMetrics.segment(segmentId) - ), - cachingExec, - cacheConfig - ) - ), - QueryMetrics::reportSegmentAndCacheTime, - queryMetrics -> queryMetrics.segment(segmentId) - ).withWaitMeasuredFromNow(), - segmentSpec - ), + perSegmentOptimizingQueryRunner, toolChest, emitter, cpuTimeAccumulator,