diff --git a/docs/querying/segmentmetadataquery.md b/docs/querying/segmentmetadataquery.md index 6f9d447354d..22176ee2647 100644 --- a/docs/querying/segmentmetadataquery.md +++ b/docs/querying/segmentmetadataquery.md @@ -62,7 +62,8 @@ There are several main parts to a segment metadata query: |merge|Merge all individual segment metadata results into a single result|no| |context|See [Context](../querying/query-context.md)|no| |analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "interval", "minmax"], but can be overridden with using the [segment metadata query config](../configuration/index.md#segmentmetadata-query-config). See section [analysisTypes](#analysistypes) for more details.|no| -|lenientAggregatorMerge|If true, and if the "aggregators" analysisType is enabled, aggregators will be merged leniently. See below for details.|no| +|aggregatorMergeStrategy| The strategy Druid uses to merge aggregators across segments. If true and if the `aggregators` analysis type is enabled, `aggregatorMergeStrategy` defaults to `strict`. Possible values include `strict`, `lenient`, and `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details.|no| +|lenientAggregatorMerge|Deprecated. Use `aggregatorMergeStrategy` property instead. If true, and if the `aggregators` analysis type is enabled, Druid merges aggregators leniently.|no| The format of the result is: @@ -185,7 +186,7 @@ Currently, there is no API for retrieving this information. * `aggregators` in the result will contain the list of aggregators usable for querying metric columns. This may be null if the aggregators are unknown or unmergeable (if merging is enabled). -* Merging can be strict or lenient. See *lenientAggregatorMerge* below for details. +* Merging can be `strict`, `lenient`, or `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details. * The form of the result is a map of column name to aggregator. @@ -194,15 +195,20 @@ null if the aggregators are unknown or unmergeable (if merging is enabled). * `rollup` in the result is true/false/null. * When merging is enabled, if some are rollup, others are not, result is null. -## lenientAggregatorMerge +### aggregatorMergeStrategy Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if -two segments use incompatible aggregators for the same column (e.g. longSum changed to doubleSum). +two segments use incompatible aggregators for the same column, such as `longSum` changed to `doubleSum`. +Druid supports the following aggregator merge strategies: -Aggregators can be merged strictly (the default) or leniently. With strict merging, if there are any segments -with unknown aggregators, or any conflicts of any kind, the merged aggregators list will be `null`. With lenient -merging, segments with unknown aggregators will be ignored, and conflicts between aggregators will only null out -the aggregator for that particular column. +- `strict`: If there are any segments with unknown aggregators or any conflicts of any kind, the merged aggregators + list is `null`. +- `lenient`: Druid ignores segments with unknown aggregators. Conflicts between aggregators set the aggregator for that particular column to null. +- the aggregator for that particular column. +- `latest`: In the event of conflicts between segments, Druid selects the aggregator from the most recent segment + for that particular column. -In particular, with lenient merging, it is possible for an individual column's aggregator to be `null`. This will not -occur with strict merging. + +### lenientAggregatorMerge (deprecated) + +Deprecated. Use [`aggregatorMergeStrategy`](#aggregatormergestrategy) instead. diff --git a/processing/src/main/java/org/apache/druid/query/Druids.java b/processing/src/main/java/org/apache/druid/query/Druids.java index 10dfde91b5f..24c1f3ddc4f 100644 --- a/processing/src/main/java/org/apache/druid/query/Druids.java +++ b/processing/src/main/java/org/apache/druid/query/Druids.java @@ -36,6 +36,7 @@ import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy; import org.apache.druid.query.metadata.metadata.ColumnIncluderator; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; import org.apache.druid.query.scan.ScanQuery; @@ -659,6 +660,7 @@ public class Druids private EnumSet analysisTypes; private Boolean merge; private Boolean lenientAggregatorMerge; + private AggregatorMergeStrategy aggregatorMergeStrategy; private Boolean usingDefaultInterval; private Map context; @@ -670,6 +672,7 @@ public class Druids analysisTypes = null; merge = null; lenientAggregatorMerge = null; + aggregatorMergeStrategy = null; usingDefaultInterval = null; context = null; } @@ -684,7 +687,8 @@ public class Druids context, analysisTypes, usingDefaultInterval, - lenientAggregatorMerge + lenientAggregatorMerge, + aggregatorMergeStrategy ); } @@ -696,7 +700,7 @@ public class Druids .toInclude(query.getToInclude()) .analysisTypes(query.getAnalysisTypes()) .merge(query.isMerge()) - .lenientAggregatorMerge(query.isLenientAggregatorMerge()) + .aggregatorMergeStrategy(query.getAggregatorMergeStrategy()) .usingDefaultInterval(query.isUsingDefaultInterval()) .context(query.getContext()); } @@ -761,12 +765,19 @@ public class Druids return this; } + @Deprecated public SegmentMetadataQueryBuilder lenientAggregatorMerge(boolean lenientAggregatorMerge) { this.lenientAggregatorMerge = lenientAggregatorMerge; return this; } + public SegmentMetadataQueryBuilder aggregatorMergeStrategy(AggregatorMergeStrategy aggregatorMergeStrategy) + { + this.aggregatorMergeStrategy = aggregatorMergeStrategy; + return this; + } + public SegmentMetadataQueryBuilder usingDefaultInterval(boolean usingDefaultInterval) { this.usingDefaultInterval = usingDefaultInterval; diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 9041db0c6a2..655b95e503f 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -31,6 +31,8 @@ import com.google.common.collect.Sets; import com.google.inject.Inject; import org.apache.druid.common.guava.CombiningSequence; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Comparators; @@ -50,15 +52,16 @@ import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; import org.apache.druid.timeline.LogicalSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -139,10 +142,10 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest createMergeFn(Query query) { return (arg1, arg2) -> mergeAnalyses( - Iterables.getFirst(query.getDataSource().getTableNames(), null), + query.getDataSource().getTableNames(), arg1, arg2, - ((SegmentMetadataQuery) query).isLenientAggregatorMerge() + ((SegmentMetadataQuery) query).getAggregatorMergeStrategy() ); } @@ -205,7 +208,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest dataSources, SegmentAnalysis arg1, SegmentAnalysis arg2, - boolean lenientAggregatorMerge + AggregatorMergeStrategy aggregatorMergeStrategy ) { if (arg1 == null) { @@ -268,16 +270,30 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest id1.getPartitionNum())) { + mergedSegmentId = SegmentId.merged(dataSource, id2.getInterval(), id2.getPartitionNum()); + final SegmentAnalysis tmp = arg1; + arg1 = arg2; + arg2 = tmp; + } else { + mergedSegmentId = SegmentId.merged(dataSource, id1.getInterval(), id1.getPartitionNum()); + } + break; } } @@ -309,7 +325,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest aggregators = new HashMap<>(); - if (lenientAggregatorMerge) { + if (AggregatorMergeStrategy.LENIENT == aggregatorMergeStrategy) { // Merge each aggregator individually, ignoring nulls for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) { if (analysis.getAggregators() != null) { @@ -331,7 +347,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest entry : analysis.getAggregators().entrySet()) { + final String aggregatorName = entry.getKey(); + final AggregatorFactory aggregator = entry.getValue(); + aggregators.putIfAbsent(aggregatorName, aggregator); + } + } + } + } else { + throw DruidException.defensive("[%s] merge strategy is not implemented.", aggregatorMergeStrategy); } final TimestampSpec timestampSpec = TimestampSpec.mergeTimestampSpec( @@ -369,7 +399,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest private final boolean merge; private final boolean usingDefaultInterval; private final EnumSet analysisTypes; - private final boolean lenientAggregatorMerge; + private final AggregatorMergeStrategy aggregatorMergeStrategy; @JsonCreator public SegmentMetadataQuery( @@ -93,7 +93,8 @@ public class SegmentMetadataQuery extends BaseQuery @JsonProperty("context") Map context, @JsonProperty("analysisTypes") EnumSet analysisTypes, @JsonProperty("usingDefaultInterval") Boolean useDefaultInterval, - @JsonProperty("lenientAggregatorMerge") Boolean lenientAggregatorMerge + @Deprecated @JsonProperty("lenientAggregatorMerge") Boolean lenientAggregatorMerge, + @JsonProperty("aggregatorMergeStrategy") AggregatorMergeStrategy aggregatorMergeStrategy ) { super(dataSource, querySegmentSpec == null ? DEFAULT_SEGMENT_SPEC : querySegmentSpec, false, context); @@ -106,11 +107,28 @@ public class SegmentMetadataQuery extends BaseQuery this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude; this.merge = merge == null ? false : merge; this.analysisTypes = analysisTypes; - Preconditions.checkArgument( - dataSource instanceof TableDataSource || dataSource instanceof UnionDataSource, - "SegmentMetadataQuery only supports table or union datasource" - ); - this.lenientAggregatorMerge = lenientAggregatorMerge == null ? false : lenientAggregatorMerge; + if (!(dataSource instanceof TableDataSource || dataSource instanceof UnionDataSource)) { + throw InvalidInput.exception("Invalid dataSource type [%s]. " + + "SegmentMetadataQuery only supports table or union datasources.", dataSource); + } + // We validate that there's only one parameter specified by the user. While the deprecated property is still + // supported in the API, we only set the new member variable either using old or new property, so we've a single source + // of truth for consumers of this class variable. The defaults are to preserve backwards compatibility. + // In a future release, 28.0+, we can remove the deprecated property lenientAggregatorMerge. + if (lenientAggregatorMerge != null && aggregatorMergeStrategy != null) { + throw InvalidInput.exception("Both lenientAggregatorMerge [%s] and aggregatorMergeStrategy [%s] parameters cannot be set." + + " Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated.", + lenientAggregatorMerge, aggregatorMergeStrategy); + } + if (lenientAggregatorMerge != null) { + this.aggregatorMergeStrategy = lenientAggregatorMerge + ? AggregatorMergeStrategy.LENIENT + : AggregatorMergeStrategy.STRICT; + } else if (aggregatorMergeStrategy != null) { + this.aggregatorMergeStrategy = aggregatorMergeStrategy; + } else { + this.aggregatorMergeStrategy = AggregatorMergeStrategy.STRICT; + } } @JsonProperty @@ -156,9 +174,9 @@ public class SegmentMetadataQuery extends BaseQuery } @JsonProperty - public boolean isLenientAggregatorMerge() + public AggregatorMergeStrategy getAggregatorMergeStrategy() { - return lenientAggregatorMerge; + return aggregatorMergeStrategy; } public boolean analyzingInterval() @@ -237,7 +255,7 @@ public class SegmentMetadataQuery extends BaseQuery ", merge=" + merge + ", usingDefaultInterval=" + usingDefaultInterval + ", analysisTypes=" + analysisTypes + - ", lenientAggregatorMerge=" + lenientAggregatorMerge + + ", aggregatorMergeStrategy=" + aggregatorMergeStrategy + '}'; } @@ -256,9 +274,9 @@ public class SegmentMetadataQuery extends BaseQuery SegmentMetadataQuery that = (SegmentMetadataQuery) o; return merge == that.merge && usingDefaultInterval == that.usingDefaultInterval && - lenientAggregatorMerge == that.lenientAggregatorMerge && Objects.equals(toInclude, that.toInclude) && - Objects.equals(analysisTypes, that.analysisTypes); + Objects.equals(analysisTypes, that.analysisTypes) && + Objects.equals(aggregatorMergeStrategy, that.aggregatorMergeStrategy); } @Override @@ -270,7 +288,7 @@ public class SegmentMetadataQuery extends BaseQuery merge, usingDefaultInterval, analysisTypes, - lenientAggregatorMerge + aggregatorMergeStrategy ); } } diff --git a/processing/src/main/java/org/apache/druid/timeline/SegmentId.java b/processing/src/main/java/org/apache/druid/timeline/SegmentId.java index 99d17c4405a..c08874844bc 100644 --- a/processing/src/main/java/org/apache/druid/timeline/SegmentId.java +++ b/processing/src/main/java/org/apache/druid/timeline/SegmentId.java @@ -231,6 +231,16 @@ public final class SegmentId implements Comparable } } + + /** + * Creates a merged SegmentId for the given data source, interval and partition number. Used when segments are + * merged. + */ + public static SegmentId merged(String dataSource, Interval interval, int partitionNum) + { + return of(dataSource, interval, "merged", partitionNum); + } + /** * Creates a dummy SegmentId with the given data source. This method is useful in benchmark and test code. */ diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java index 4fdbe950c33..796efc75493 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java @@ -249,7 +249,15 @@ public class SegmentAnalyzerTest extends InitializedNullHandlingTest ); final SegmentMetadataQuery query = new SegmentMetadataQuery( - new TableDataSource("test"), new LegacySegmentSpec("2011/2012"), null, null, null, analyses, false, false + new TableDataSource("test"), + new LegacySegmentSpec("2011/2012"), + null, + null, + null, + analyses, + false, + null, + null ); return runner.run(QueryPlus.wrap(query)).toList(); } diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index f6a8e8b4979..4e77087c796 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -23,9 +23,13 @@ package org.apache.druid.query.metadata; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -33,6 +37,7 @@ import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; @@ -40,11 +45,14 @@ import org.apache.druid.query.spec.LegacySegmentSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ValueType; import org.apache.druid.timeline.LogicalSegment; +import org.apache.druid.timeline.SegmentId; +import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -53,18 +61,23 @@ import java.util.stream.Collectors; public class SegmentMetadataQueryQueryToolChestTest { + private static final DataSource TEST_DATASOURCE = new TableDataSource("dummy"); + private static final SegmentId TEST_SEGMENT_ID1 = SegmentId.of(TEST_DATASOURCE.toString(), Intervals.of("2020-01-01/2020-01-02"), "test", 0); + private static final SegmentId TEST_SEGMENT_ID2 = SegmentId.of(TEST_DATASOURCE.toString(), Intervals.of("2021-01-01/2021-01-02"), "test", 0); + @Test public void testCacheStrategy() throws Exception { SegmentMetadataQuery query = new SegmentMetadataQuery( - new TableDataSource("dummy"), + TEST_DATASOURCE, new LegacySegmentSpec("2015-01-01/2015-01-02"), null, null, null, null, false, - false + null, + AggregatorMergeStrategy.STRICT ); CacheStrategy strategy = @@ -76,7 +89,7 @@ public class SegmentMetadataQueryQueryToolChestTest Assert.assertArrayEquals(expectedKey, actualKey); SegmentAnalysis result = new SegmentAnalysis( - "testSegment", + TEST_SEGMENT_ID1.toString(), ImmutableList.of(Intervals.of("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")), new LinkedHashMap<>( ImmutableMap.of( @@ -119,7 +132,7 @@ public class SegmentMetadataQueryQueryToolChestTest public void testMergeAggregators() { final SegmentAnalysis analysis1 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID1.toString(), null, new LinkedHashMap<>(), 0, @@ -133,7 +146,7 @@ public class SegmentMetadataQueryQueryToolChestTest null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID2.toString(), null, new LinkedHashMap<>(), 0, @@ -148,20 +161,154 @@ public class SegmentMetadataQueryQueryToolChestTest ); Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, ImmutableMap.of( "foo", new LongSumAggregatorFactory("foo", "foo"), "bar", new DoubleSumAggregatorFactory("bar", "bar"), "baz", new DoubleSumAggregatorFactory("baz", "baz") ), - mergeStrict(analysis1, analysis2).getAggregators() + null, + null, + null + ), + mergeStrict(analysis1, analysis2) ); + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar"), + "baz", new DoubleSumAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLenient(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar"), + "baz", new DoubleSumAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLatest(analysis1, analysis2) + ); + } + + @Test + public void testMergeAggregatorsWithIntervals() + { + final SegmentAnalysis analysis1 = new SegmentAnalysis( + TEST_SEGMENT_ID1.toString(), + ImmutableList.of(TEST_SEGMENT_ID1.getInterval()), + new LinkedHashMap<>(), + 0, + 0, ImmutableMap.of( "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), "baz", new DoubleSumAggregatorFactory("baz", "baz") ), - mergeLenient(analysis1, analysis2).getAggregators() + null, + null, + null + ); + final SegmentAnalysis analysis2 = new SegmentAnalysis( + TEST_SEGMENT_ID2.toString(), + ImmutableList.of(TEST_SEGMENT_ID2.getInterval()), + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar") + ), + null, + null, + null + ); + + final List expectedIntervals = new ArrayList<>(); + expectedIntervals.addAll(analysis1.getIntervals()); + expectedIntervals.addAll(analysis2.getIntervals()); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + expectedIntervals, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar"), + "baz", new DoubleSumAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeStrict(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + expectedIntervals, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar"), + "baz", new DoubleSumAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLenient(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + expectedIntervals, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar"), + "baz", new DoubleSumAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLatest(analysis1, analysis2) ); } @@ -169,7 +316,7 @@ public class SegmentMetadataQueryQueryToolChestTest public void testMergeAggregatorsOneNull() { final SegmentAnalysis analysis1 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID1.toString(), null, new LinkedHashMap<>(), 0, @@ -180,7 +327,7 @@ public class SegmentMetadataQueryQueryToolChestTest null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID2.toString(), null, new LinkedHashMap<>(), 0, @@ -194,13 +341,55 @@ public class SegmentMetadataQueryQueryToolChestTest null ); - Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators()); Assert.assertEquals( - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + null, + null, + null, + null ), - mergeLenient(analysis1, analysis2).getAggregators() + mergeStrict(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar") + ), + null, + null, + null + ), + mergeLenient(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar") + ), + null, + null, + null + ), + mergeLatest(analysis1, analysis2) ); } @@ -208,7 +397,7 @@ public class SegmentMetadataQueryQueryToolChestTest public void testMergeAggregatorsAllNull() { final SegmentAnalysis analysis1 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID1.toString(), null, new LinkedHashMap<>(), 0, @@ -219,7 +408,7 @@ public class SegmentMetadataQueryQueryToolChestTest null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID2.toString(), null, new LinkedHashMap<>(), 0, @@ -230,15 +419,57 @@ public class SegmentMetadataQueryQueryToolChestTest null ); - Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators()); - Assert.assertNull(mergeLenient(analysis1, analysis2).getAggregators()); + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + null, + null, + null, + null + ), + mergeStrict(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + null, + null, + null, + null + ), + mergeLenient(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + null, + null, + null, + null + ), + mergeLatest(analysis1, analysis2) + ); } @Test public void testMergeAggregatorsConflict() { final SegmentAnalysis analysis1 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID1.toString(), null, new LinkedHashMap<>(), 0, @@ -252,7 +483,7 @@ public class SegmentMetadataQueryQueryToolChestTest null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID2.toString(), null, new LinkedHashMap<>(), 0, @@ -271,16 +502,361 @@ public class SegmentMetadataQueryQueryToolChestTest expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo")); expectedLenient.put("bar", null); expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz")); - Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators()); - Assert.assertEquals(expectedLenient, mergeLenient(analysis1, analysis2).getAggregators()); - // Simulate multi-level merge Assert.assertEquals( - expectedLenient, + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + null, + null, + null, + null + ), + mergeStrict(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + expectedLenient, + null, + null, + null + ), + mergeLenient(analysis1, analysis2) + ); + + // Simulate multi-level lenient merge + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + expectedLenient, + null, + null, + null + ), mergeLenient( mergeLenient(analysis1, analysis2), mergeLenient(analysis1, analysis2) - ).getAggregators() + ) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleMaxAggregatorFactory("bar", "bar"), + "baz", new LongMaxAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLatest(analysis1, analysis2) + ); + + // Simulate multi-level lenient merge + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleMaxAggregatorFactory("bar", "bar"), + "baz", new LongMaxAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLatest( + mergeLatest(analysis1, analysis2), + mergeLatest(analysis1, analysis2) + ) + ); + } + + @Test + public void testMergeAggregatorsConflictWithDifferentOrder() + { + final SegmentAnalysis analysis1 = new SegmentAnalysis( + TEST_SEGMENT_ID2.toString(), + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar") + ), + null, + null, + null + ); + + final SegmentAnalysis analysis2 = new SegmentAnalysis( + TEST_SEGMENT_ID1.toString(), + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleMaxAggregatorFactory("bar", "bar"), + "baz", new LongMaxAggregatorFactory("baz", "baz") + ), + null, + null, + null + ); + + final Map expectedLenient = new HashMap<>(); + expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo")); + expectedLenient.put("bar", null); + expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz")); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + null, + null, + null, + null + ), + mergeStrict(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + expectedLenient, + null, + null, + null + ), + mergeLenient(analysis1, analysis2) + ); + + // Simulate multi-level lenient merge + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + expectedLenient, + null, + null, + null + ), + mergeLenient( + mergeLenient(analysis1, analysis2), + mergeLenient(analysis1, analysis2) + ) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar"), + "baz", new LongMaxAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLatest(analysis1, analysis2) + ); + + // Simulate multi-level lenient merge + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar"), + "baz", new LongMaxAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLatest( + mergeLatest(analysis1, analysis2), + mergeLatest(analysis1, analysis2) + ) + ); + } + + @Test + public void testMergeAggregatorsConflictWithEqualSegmentIntervalsAndDifferentPartitions() + { + final SegmentId segmentId1 = SegmentId.of(TEST_DATASOURCE.toString(), Intervals.of("2023-01-01/2023-01-02"), "test", 1); + final SegmentId segmentId2 = SegmentId.of(TEST_DATASOURCE.toString(), Intervals.of("2023-01-01/2023-01-02"), "test", 2); + + final SegmentAnalysis analysis1 = new SegmentAnalysis( + segmentId1.toString(), + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar") + ), + null, + null, + null + ); + + final SegmentAnalysis analysis2 = new SegmentAnalysis( + segmentId2.toString(), + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleMaxAggregatorFactory("bar", "bar"), + "baz", new LongMaxAggregatorFactory("baz", "baz") + ), + null, + null, + null + ); + + final Map expectedLenient = new HashMap<>(); + expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo")); + expectedLenient.put("bar", null); + expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz")); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", + null, + new LinkedHashMap<>(), + 0, + 0, + null, + null, + null, + null + ), + mergeStrict(analysis1, analysis2) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", + null, + new LinkedHashMap<>(), + 0, + 0, + expectedLenient, + null, + null, + null + ), + mergeLenient(analysis1, analysis2) + ); + + // Simulate multi-level lenient merge + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", + null, + new LinkedHashMap<>(), + 0, + 0, + expectedLenient, + null, + null, + null + ), + mergeLenient( + mergeLenient(analysis1, analysis2), + mergeLenient(analysis1, analysis2) + ) + ); + + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleMaxAggregatorFactory("bar", "bar"), + "baz", new LongMaxAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLatest(analysis1, analysis2) + ); + + // Simulate multi-level lenient merge + Assert.assertEquals( + new SegmentAnalysis( + "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", + null, + new LinkedHashMap<>(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleMaxAggregatorFactory("bar", "bar"), + "baz", new LongMaxAggregatorFactory("baz", "baz") + ), + null, + null, + null + ), + mergeLatest( + mergeLatest(analysis1, analysis2), + mergeLatest(analysis1, analysis2) + ) ); } @@ -333,7 +909,7 @@ public class SegmentMetadataQueryQueryToolChestTest public void testMergeRollup() { final SegmentAnalysis analysis1 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID1.toString(), null, new LinkedHashMap<>(), 0, @@ -344,7 +920,7 @@ public class SegmentMetadataQueryQueryToolChestTest null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID2.toString(), null, new LinkedHashMap<>(), 0, @@ -355,7 +931,7 @@ public class SegmentMetadataQueryQueryToolChestTest false ); final SegmentAnalysis analysis3 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID1.toString(), null, new LinkedHashMap<>(), 0, @@ -366,7 +942,7 @@ public class SegmentMetadataQueryQueryToolChestTest false ); final SegmentAnalysis analysis4 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID2.toString(), null, new LinkedHashMap<>(), 0, @@ -377,7 +953,7 @@ public class SegmentMetadataQueryQueryToolChestTest true ); final SegmentAnalysis analysis5 = new SegmentAnalysis( - "id", + TEST_SEGMENT_ID1.toString(), null, new LinkedHashMap<>(), 0, @@ -393,16 +969,87 @@ public class SegmentMetadataQueryQueryToolChestTest Assert.assertNull(mergeStrict(analysis2, analysis4).isRollup()); Assert.assertFalse(mergeStrict(analysis2, analysis3).isRollup()); Assert.assertTrue(mergeStrict(analysis4, analysis5).isRollup()); + + Assert.assertNull(mergeLenient(analysis1, analysis2).isRollup()); + Assert.assertNull(mergeLenient(analysis1, analysis4).isRollup()); + Assert.assertNull(mergeLenient(analysis2, analysis4).isRollup()); + Assert.assertFalse(mergeLenient(analysis2, analysis3).isRollup()); + Assert.assertTrue(mergeLenient(analysis4, analysis5).isRollup()); + + Assert.assertNull(mergeLatest(analysis1, analysis2).isRollup()); + Assert.assertNull(mergeLatest(analysis1, analysis4).isRollup()); + Assert.assertNull(mergeLatest(analysis2, analysis4).isRollup()); + Assert.assertFalse(mergeLatest(analysis2, analysis3).isRollup()); + Assert.assertTrue(mergeLatest(analysis4, analysis5).isRollup()); + } + + @Test + public void testInvalidMergeAggregatorsWithNullOrEmptyDatasource() + { + final SegmentAnalysis analysis1 = new SegmentAnalysis( + TEST_SEGMENT_ID1.toString(), + null, + new LinkedHashMap<>(), + 0, + 0, + null, + null, + null, + null + ); + final SegmentAnalysis analysis2 = new SegmentAnalysis( + TEST_SEGMENT_ID2.toString(), + null, + new LinkedHashMap<>(), + 0, + 0, + null, + null, + null, + false + ); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> SegmentMetadataQueryQueryToolChest.mergeAnalyses( + null, + analysis1, + analysis2, + AggregatorMergeStrategy.STRICT + ) + ), + DruidExceptionMatcher + .invalidInput() + .expectMessageIs( + "SegementMetadata queries require at least one datasource.") + ); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> SegmentMetadataQueryQueryToolChest.mergeAnalyses( + ImmutableSet.of(), + analysis1, + analysis2, + AggregatorMergeStrategy.STRICT + ) + ), + DruidExceptionMatcher + .invalidInput() + .expectMessageIs( + "SegementMetadata queries require at least one datasource.") + ); } private static SegmentAnalysis mergeStrict(SegmentAnalysis analysis1, SegmentAnalysis analysis2) { return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( SegmentMetadataQueryQueryToolChest.mergeAnalyses( - null, + TEST_DATASOURCE.getTableNames(), analysis1, analysis2, - false + AggregatorMergeStrategy.STRICT ) ); } @@ -411,10 +1058,22 @@ public class SegmentMetadataQueryQueryToolChestTest { return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( SegmentMetadataQueryQueryToolChest.mergeAnalyses( - null, + TEST_DATASOURCE.getTableNames(), analysis1, analysis2, - true + AggregatorMergeStrategy.LENIENT + ) + ); + } + + private static SegmentAnalysis mergeLatest(SegmentAnalysis analysis1, SegmentAnalysis analysis2) + { + return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( + SegmentMetadataQueryQueryToolChest.mergeAnalyses( + TEST_DATASOURCE.getTableNames(), + analysis1, + analysis2, + AggregatorMergeStrategy.LATEST ) ); } diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java index 0a93dd373ac..0e13b9f5986 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java @@ -20,20 +20,27 @@ package org.apache.druid.query.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BySegmentResultValue; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.JoinDataSource; +import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; @@ -42,22 +49,28 @@ import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.Result; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.ListColumnIncluderator; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; +import org.apache.druid.query.spec.LegacySegmentSpec; import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.join.JoinType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.LogicalSegment; import org.apache.druid.timeline.SegmentId; +import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -793,7 +806,74 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest .intervals("2013/2014") .toInclude(new ListColumnIncluderator(Collections.singletonList("placement"))) .analysisTypes(SegmentMetadataQuery.AnalysisType.AGGREGATORS) + .merge(true) // if the aggregator strategy is unsepcified, it defaults to strict. + .build(); + TestHelper.assertExpectedObjects( + ImmutableList.of(mergedSegmentAnalysis), + myRunner.run(QueryPlus.wrap(query)), + "failed SegmentMetadata merging query" + ); + exec.shutdownNow(); + } + + @Test + public void testSegmentMetadataQueryWithAggregatorsMergeLenientStrategy() + { + final Map expectedAggregators = new HashMap<>(); + for (AggregatorFactory agg : TestIndex.METRIC_AGGS) { + expectedAggregators.put(agg.getName(), agg.getCombiningFactory()); + } + SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis( + differentIds ? "merged" : SegmentId.dummy("testSegment").toString(), + null, + new LinkedHashMap<>( + ImmutableMap.of( + "placement", + new ColumnAnalysis( + ColumnType.STRING, + ValueType.STRING.toString(), + false, + false, + 0, + 0, + NullHandling.defaultStringValue(), + NullHandling.defaultStringValue(), + null + ) + ) + ), + 0, + expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), + expectedAggregators, + null, + null, + null + ); + + QueryToolChest toolChest = FACTORY.getToolchest(); + + ExecutorService exec = Executors.newCachedThreadPool(); + QueryRunner myRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + FACTORY.mergeRunners( + Execs.directExecutor(), + Lists.newArrayList( + toolChest.preMergeQueryDecoration(runner1), + toolChest.preMergeQueryDecoration(runner2) + ) + ) + ), + toolChest + ); + + SegmentMetadataQuery query = Druids + .newSegmentMetadataQueryBuilder() + .dataSource("testing222") + .intervals("2013/2014") + .toInclude(new ListColumnIncluderator(Collections.singletonList("placement"))) + .analysisTypes(SegmentMetadataQuery.AnalysisType.AGGREGATORS) .merge(true) + .aggregatorMergeStrategy(AggregatorMergeStrategy.LENIENT) .build(); TestHelper.assertExpectedObjects( ImmutableList.of(mergedSegmentAnalysis), @@ -990,9 +1070,13 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest query.getIntervals().get(0) ); Assert.assertEquals(expectedAnalysisTypes, ((SegmentMetadataQuery) query).getAnalysisTypes()); + Assert.assertEquals(AggregatorMergeStrategy.STRICT, ((SegmentMetadataQuery) query).getAggregatorMergeStrategy()); // test serialize and deserialize Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class)); + + // test copy + Assert.assertEquals(query, Druids.SegmentMetadataQueryBuilder.copy((SegmentMetadataQuery) query).build()); } @Test @@ -1004,9 +1088,11 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest + "}"; Query query = MAPPER.readValue(queryStr, Query.class); Assert.assertTrue(query instanceof SegmentMetadataQuery); + Assert.assertTrue(query.getDataSource() instanceof TableDataSource); Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames())); Assert.assertEquals(Intervals.ETERNITY, query.getIntervals().get(0)); Assert.assertTrue(((SegmentMetadataQuery) query).isUsingDefaultInterval()); + Assert.assertEquals(AggregatorMergeStrategy.STRICT, ((SegmentMetadataQuery) query).getAggregatorMergeStrategy()); // test serialize and deserialize Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class)); @@ -1015,6 +1101,51 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest Assert.assertEquals(query, Druids.SegmentMetadataQueryBuilder.copy((SegmentMetadataQuery) query).build()); } + @Test + public void testSerdeWithLatestAggregatorStrategy() throws Exception + { + String queryStr = "{\n" + + " \"queryType\":\"segmentMetadata\",\n" + + " \"dataSource\":\"test_ds\",\n" + + " \"aggregatorMergeStrategy\":\"latest\"\n" + + "}"; + Query query = MAPPER.readValue(queryStr, Query.class); + Assert.assertTrue(query instanceof SegmentMetadataQuery); + Assert.assertTrue(query.getDataSource() instanceof TableDataSource); + Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames())); + Assert.assertEquals(Intervals.ETERNITY, query.getIntervals().get(0)); + Assert.assertTrue(((SegmentMetadataQuery) query).isUsingDefaultInterval()); + Assert.assertEquals(AggregatorMergeStrategy.LATEST, ((SegmentMetadataQuery) query).getAggregatorMergeStrategy()); + + // test serialize and deserialize + Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class)); + + // test copy + Assert.assertEquals(query, Druids.SegmentMetadataQueryBuilder.copy((SegmentMetadataQuery) query).build()); + } + + @Test + public void testSerdeWithBothDeprecatedAndNewParameters() + { + String queryStr = "{\n" + + " \"queryType\":\"segmentMetadata\",\n" + + " \"dataSource\":\"test_ds\",\n" + + " \"lenientAggregatorMerge\":\"true\",\n" + + " \"aggregatorMergeStrategy\":\"lenient\"\n" + + "}"; + + ValueInstantiationException exception = Assert.assertThrows( + ValueInstantiationException.class, + () -> MAPPER.readValue(queryStr, Query.class) + ); + + Assert.assertTrue( + exception.getCause().getMessage().contains( + "Both lenientAggregatorMerge [true] and aggregatorMergeStrategy [lenient] parameters cannot be set. Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated." + ) + ); + } + @Test public void testDefaultIntervalAndFiltering() { @@ -1304,7 +1435,6 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest @Test public void testAnanlysisTypesBeingSet() { - SegmentMetadataQuery query1 = Druids.newSegmentMetadataQueryBuilder() .dataSource("testing") .toInclude(new ListColumnIncluderator(Collections.singletonList("foo"))) @@ -1408,4 +1538,253 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest ); testSegmentMetadataQueryWithDefaultAnalysisMerge("null_column", analysis); } + + @Test + public void testSegmentMetadataQueryWithInvalidDatasourceTypes() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new SegmentMetadataQuery( + InlineDataSource.fromIterable( + ImmutableList.of(new Object[0]), + RowSignature.builder().add("column", ColumnType.STRING).build() + ), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + null, + null + ) + ), + DruidExceptionMatcher + .invalidInput() + .expectMessageIs( + "Invalid dataSource type [InlineDataSource{signature={column:STRING}}]. SegmentMetadataQuery only supports table or union datasources.") + ); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new SegmentMetadataQuery( + new LookupDataSource("lookyloo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + null, + null + ) + ), + DruidExceptionMatcher + .invalidInput() + .expectMessageIs( + "Invalid dataSource type [LookupDataSource{lookupName='lookyloo'}]. SegmentMetadataQuery only supports table or union datasources.") + ); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new SegmentMetadataQuery( + JoinDataSource.create( + new TableDataSource("table1"), + new TableDataSource("table2"), + "j.", + "x == \"j.x\"", + JoinType.LEFT, + null, + ExprMacroTable.nil(), + null + ), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + null, + null + ) + ), + DruidExceptionMatcher + .invalidInput() + .expectMessageIs( + "Invalid dataSource type [JoinDataSource{left=table1, right=table2, rightPrefix='j.', condition=x == \"j.x\", joinType=LEFT, leftFilter=null}]. SegmentMetadataQuery only supports table or union datasources.") + ); + } + + @Test + public void testSegmentMetadataQueryWithAggregatorMergeStrictStrategy() + { + // This is the default behavior -- if nothing is specified, the merge strategy is strict. + Assert.assertEquals( + AggregatorMergeStrategy.STRICT, + new SegmentMetadataQuery( + new TableDataSource("foo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + null, + null + ).getAggregatorMergeStrategy() + ); + + Assert.assertEquals( + AggregatorMergeStrategy.STRICT, + new SegmentMetadataQuery( + new TableDataSource("foo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + false, + null + ).getAggregatorMergeStrategy() + ); + + Assert.assertEquals( + AggregatorMergeStrategy.STRICT, + new SegmentMetadataQuery( + new TableDataSource("foo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + null, + AggregatorMergeStrategy.STRICT + ).getAggregatorMergeStrategy() + ); + } + + @Test + public void testSegmentMetadataQueryWithAggregatorMergeLenientStrategy() + { + Assert.assertEquals( + AggregatorMergeStrategy.LENIENT, + new SegmentMetadataQuery( + new TableDataSource("foo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + true, + null + ).getAggregatorMergeStrategy() + ); + + Assert.assertEquals( + AggregatorMergeStrategy.LENIENT, + new SegmentMetadataQuery( + new TableDataSource("foo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + null, + AggregatorMergeStrategy.LENIENT + ).getAggregatorMergeStrategy() + ); + } + + @Test + public void testSegmentMetadataQueryWithAggregatorMergeLatestStrategy() + { + Assert.assertEquals( + AggregatorMergeStrategy.LATEST, + new SegmentMetadataQuery( + new TableDataSource("foo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + null, + AggregatorMergeStrategy.LATEST + ).getAggregatorMergeStrategy() + ); + } + + @Test + public void testSegmentMetadataQueryWithBothDeprecatedAndNewParameter() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new SegmentMetadataQuery( + new TableDataSource("foo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + false, + AggregatorMergeStrategy.STRICT + ) + ), + DruidExceptionMatcher.invalidInput() + .expectMessageIs( + "Both lenientAggregatorMerge [false] and aggregatorMergeStrategy [strict] parameters cannot be set." + + " Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated.") + ); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new SegmentMetadataQuery( + new TableDataSource("foo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + true, + AggregatorMergeStrategy.LENIENT + ) + ), + DruidExceptionMatcher.invalidInput() + .expectMessageIs( + "Both lenientAggregatorMerge [true] and aggregatorMergeStrategy [lenient] parameters cannot be set." + + " Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated.") + ); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new SegmentMetadataQuery( + new TableDataSource("foo"), + new LegacySegmentSpec("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + false, + AggregatorMergeStrategy.LATEST + ) + ), + DruidExceptionMatcher.invalidInput() + .expectMessageIs( + "Both lenientAggregatorMerge [false] and aggregatorMergeStrategy [latest] parameters cannot be set." + + " Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated.") + ); + } } diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataUnionQueryTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataUnionQueryTest.java index b29bfeb4950..48a3acbf1bf 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataUnionQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataUnionQueryTest.java @@ -21,7 +21,6 @@ package org.apache.druid.query.metadata; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryPlus; @@ -50,9 +49,6 @@ import java.util.List; @RunWith(Parameterized.class) public class SegmentMetadataUnionQueryTest extends InitializedNullHandlingTest { - static { - NullHandling.initializeForTests(); - } private static final QueryRunnerFactory FACTORY = new SegmentMetadataQueryRunnerFactory( new SegmentMetadataQueryQueryToolChest(new SegmentMetadataQueryConfig()), diff --git a/server/src/test/java/org/apache/druid/server/log/FilteredRequestLoggerTest.java b/server/src/test/java/org/apache/druid/server/log/FilteredRequestLoggerTest.java index b788589e224..8e65c522a3e 100644 --- a/server/src/test/java/org/apache/druid/server/log/FilteredRequestLoggerTest.java +++ b/server/src/test/java/org/apache/druid/server/log/FilteredRequestLoggerTest.java @@ -56,6 +56,7 @@ public class FilteredRequestLoggerTest null, null, null, + null, null ); diff --git a/services/src/main/java/org/apache/druid/cli/DumpSegment.java b/services/src/main/java/org/apache/druid/cli/DumpSegment.java index 8c865af1664..769a4b8fc7d 100644 --- a/services/src/main/java/org/apache/druid/cli/DumpSegment.java +++ b/services/src/main/java/org/apache/druid/cli/DumpSegment.java @@ -241,7 +241,8 @@ public class DumpSegment extends GuiceRunnable null, EnumSet.allOf(SegmentMetadataQuery.AnalysisType.class), false, - false + null, + null ); withOutputStream( new Function() diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java index ea3dc395671..278a9010ebe 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java @@ -932,7 +932,8 @@ public class SegmentMetadataCache ), EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), false, - false + null, + null // we don't care about merging strategy because merge is false ); return queryLifecycleFactory diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java index 0414878b4a5..3d3d73741ad 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java @@ -1309,7 +1309,8 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon queryContext, EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), false, - false + null, + null ); QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class); diff --git a/web-console/src/utils/sampler.ts b/web-console/src/utils/sampler.ts index 622584e667b..a8f5fecfb24 100644 --- a/web-console/src/utils/sampler.ts +++ b/web-console/src/utils/sampler.ts @@ -312,7 +312,7 @@ export async function sampleForConnect( dataSource, intervals, merge: true, - lenientAggregatorMerge: true, + aggregatorMergeStrategy: 'lenient', analysisTypes: ['aggregators', 'rollup'], }); diff --git a/website/.spelling b/website/.spelling index 63573f94a86..be6441fd7c6 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1799,6 +1799,7 @@ InsensitiveContainsSearchQuerySpec RegexSearchQuerySpec analysisType analysisTypes +aggregatorMergeStrategy lenientAggregatorMerge minmax segmentMetadata