Add `aggregatorMergeStrategy` property in SegmentMetadata queries (#14560)

* Add aggregatorMergeStrategy property to SegmentMetadaQuery.

- Adds a new property aggregatorMergeStrategy to segmentMetadata query.
aggregatorMergeStrategy currently supports three types of merge strategies -
the legacy strict and lenient strategies, and the new latest strategy.
- The latest strategy considers the latest aggregator from the latest segment
by time order when there's a conflict when merging aggregators from different
segments.
- Deprecate lenientAggregatorMerge property; The API validates that both the new
and old properties are not set, and returns an exception.
- When merging segments as part of segmentMetadata query, the segments have a more
elaborate id -- <datasource>_<interval>_merged_<partition_number> format, similar to
the name format that segments usually contain. Previously it was simply "merged".
- Adjust unit tests to test the latest strategy, to assert the returned complete
SegmentAnalysis object instead of just the aggregators for completeness.

* Don't explicitly set strict strategy in tests

* Apply suggestions from code review

Co-authored-by: Katya Macedo  <38017980+ektravel@users.noreply.github.com>

* Update docs/querying/segmentmetadataquery.md

* Apply suggestions from code review

Co-authored-by: Katya Macedo  <38017980+ektravel@users.noreply.github.com>

---------

Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com>
This commit is contained in:
Abhishek Radhakrishnan 2023-07-13 09:37:36 -07:00 committed by GitHub
parent 450ecd6370
commit f4ee58eaa8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1254 additions and 87 deletions

View File

@ -62,7 +62,8 @@ There are several main parts to a segment metadata query:
|merge|Merge all individual segment metadata results into a single result|no|
|context|See [Context](../querying/query-context.md)|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "interval", "minmax"], but can be overridden with using the [segment metadata query config](../configuration/index.md#segmentmetadata-query-config). See section [analysisTypes](#analysistypes) for more details.|no|
|lenientAggregatorMerge|If true, and if the "aggregators" analysisType is enabled, aggregators will be merged leniently. See below for details.|no|
|aggregatorMergeStrategy| The strategy Druid uses to merge aggregators across segments. If true and if the `aggregators` analysis type is enabled, `aggregatorMergeStrategy` defaults to `strict`. Possible values include `strict`, `lenient`, and `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details.|no|
|lenientAggregatorMerge|Deprecated. Use `aggregatorMergeStrategy` property instead. If true, and if the `aggregators` analysis type is enabled, Druid merges aggregators leniently.|no|
The format of the result is:
@ -185,7 +186,7 @@ Currently, there is no API for retrieving this information.
* `aggregators` in the result will contain the list of aggregators usable for querying metric columns. This may be
null if the aggregators are unknown or unmergeable (if merging is enabled).
* Merging can be strict or lenient. See *lenientAggregatorMerge* below for details.
* Merging can be `strict`, `lenient`, or `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details.
* The form of the result is a map of column name to aggregator.
@ -194,15 +195,20 @@ null if the aggregators are unknown or unmergeable (if merging is enabled).
* `rollup` in the result is true/false/null.
* When merging is enabled, if some are rollup, others are not, result is null.
## lenientAggregatorMerge
### aggregatorMergeStrategy
Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if
two segments use incompatible aggregators for the same column (e.g. longSum changed to doubleSum).
two segments use incompatible aggregators for the same column, such as `longSum` changed to `doubleSum`.
Druid supports the following aggregator merge strategies:
Aggregators can be merged strictly (the default) or leniently. With strict merging, if there are any segments
with unknown aggregators, or any conflicts of any kind, the merged aggregators list will be `null`. With lenient
merging, segments with unknown aggregators will be ignored, and conflicts between aggregators will only null out
the aggregator for that particular column.
- `strict`: If there are any segments with unknown aggregators or any conflicts of any kind, the merged aggregators
list is `null`.
- `lenient`: Druid ignores segments with unknown aggregators. Conflicts between aggregators set the aggregator for that particular column to null.
- the aggregator for that particular column.
- `latest`: In the event of conflicts between segments, Druid selects the aggregator from the most recent segment
for that particular column.
In particular, with lenient merging, it is possible for an individual column's aggregator to be `null`. This will not
occur with strict merging.
### lenientAggregatorMerge (deprecated)
Deprecated. Use [`aggregatorMergeStrategy`](#aggregatormergestrategy) instead.

View File

@ -36,6 +36,7 @@ import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy;
import org.apache.druid.query.metadata.metadata.ColumnIncluderator;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.scan.ScanQuery;
@ -659,6 +660,7 @@ public class Druids
private EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes;
private Boolean merge;
private Boolean lenientAggregatorMerge;
private AggregatorMergeStrategy aggregatorMergeStrategy;
private Boolean usingDefaultInterval;
private Map<String, Object> context;
@ -670,6 +672,7 @@ public class Druids
analysisTypes = null;
merge = null;
lenientAggregatorMerge = null;
aggregatorMergeStrategy = null;
usingDefaultInterval = null;
context = null;
}
@ -684,7 +687,8 @@ public class Druids
context,
analysisTypes,
usingDefaultInterval,
lenientAggregatorMerge
lenientAggregatorMerge,
aggregatorMergeStrategy
);
}
@ -696,7 +700,7 @@ public class Druids
.toInclude(query.getToInclude())
.analysisTypes(query.getAnalysisTypes())
.merge(query.isMerge())
.lenientAggregatorMerge(query.isLenientAggregatorMerge())
.aggregatorMergeStrategy(query.getAggregatorMergeStrategy())
.usingDefaultInterval(query.isUsingDefaultInterval())
.context(query.getContext());
}
@ -761,12 +765,19 @@ public class Druids
return this;
}
@Deprecated
public SegmentMetadataQueryBuilder lenientAggregatorMerge(boolean lenientAggregatorMerge)
{
this.lenientAggregatorMerge = lenientAggregatorMerge;
return this;
}
public SegmentMetadataQueryBuilder aggregatorMergeStrategy(AggregatorMergeStrategy aggregatorMergeStrategy)
{
this.aggregatorMergeStrategy = aggregatorMergeStrategy;
return this;
}
public SegmentMetadataQueryBuilder usingDefaultInterval(boolean usingDefaultInterval)
{
this.usingDefaultInterval = usingDefaultInterval;

View File

@ -31,6 +31,8 @@ import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
@ -50,15 +52,16 @@ import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.timeline.LogicalSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@ -139,10 +142,10 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
public BinaryOperator<SegmentAnalysis> createMergeFn(Query<SegmentAnalysis> query)
{
return (arg1, arg2) -> mergeAnalyses(
Iterables.getFirst(query.getDataSource().getTableNames(), null),
query.getDataSource().getTableNames(),
arg1,
arg2,
((SegmentMetadataQuery) query).isLenientAggregatorMerge()
((SegmentMetadataQuery) query).getAggregatorMergeStrategy()
);
}
@ -205,7 +208,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
// need to include query "merge" and "lenientAggregatorMerge" for result level cache key
return new CacheKeyBuilder(SEGMENT_METADATA_QUERY).appendByteArray(computeCacheKey(query))
.appendBoolean(query.isMerge())
.appendBoolean(query.isLenientAggregatorMerge())
.build();
}
@ -254,10 +256,10 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
@VisibleForTesting
public static SegmentAnalysis mergeAnalyses(
@Nullable String dataSource,
Set<String> dataSources,
SegmentAnalysis arg1,
SegmentAnalysis arg2,
boolean lenientAggregatorMerge
AggregatorMergeStrategy aggregatorMergeStrategy
)
{
if (arg1 == null) {
@ -268,16 +270,30 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
return arg1;
}
// Swap arg1, arg2 so the later-ending interval is first. This ensures we prefer the latest column order.
// We're preserving it so callers can see columns in their natural order.
if (dataSource != null) {
// This is a defensive check since SegementMetadata query instantiation guarantees this
if (CollectionUtils.isNullOrEmpty(dataSources)) {
throw InvalidInput.exception("SegementMetadata queries require at least one datasource.");
}
SegmentId mergedSegmentId = null;
for (String dataSource : dataSources) {
final SegmentId id1 = SegmentId.tryParse(dataSource, arg1.getId());
final SegmentId id2 = SegmentId.tryParse(dataSource, arg2.getId());
if (id1 != null && id2 != null && id2.getIntervalEnd().isAfter(id1.getIntervalEnd())) {
final SegmentAnalysis tmp = arg1;
arg1 = arg2;
arg2 = tmp;
// Swap arg1, arg2 so the later-ending interval is first. This ensures we prefer the latest column order.
// We're preserving it so callers can see columns in their natural order.
if (id1 != null && id2 != null) {
if (id2.getIntervalEnd().isAfter(id1.getIntervalEnd()) ||
(id2.getIntervalEnd().isEqual(id1.getIntervalEnd()) && id2.getPartitionNum() > id1.getPartitionNum())) {
mergedSegmentId = SegmentId.merged(dataSource, id2.getInterval(), id2.getPartitionNum());
final SegmentAnalysis tmp = arg1;
arg1 = arg2;
arg2 = tmp;
} else {
mergedSegmentId = SegmentId.merged(dataSource, id1.getInterval(), id1.getPartitionNum());
}
break;
}
}
@ -309,7 +325,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
final Map<String, AggregatorFactory> aggregators = new HashMap<>();
if (lenientAggregatorMerge) {
if (AggregatorMergeStrategy.LENIENT == aggregatorMergeStrategy) {
// Merge each aggregator individually, ignoring nulls
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
if (analysis.getAggregators() != null) {
@ -331,7 +347,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
}
}
} else {
} else if (AggregatorMergeStrategy.STRICT == aggregatorMergeStrategy) {
final AggregatorFactory[] aggs1 = arg1.getAggregators() != null
? arg1.getAggregators()
.values()
@ -348,6 +364,20 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
aggregators.put(aggregator.getName(), aggregator);
}
}
} else if (AggregatorMergeStrategy.LATEST == aggregatorMergeStrategy) {
// The segment analyses are already ordered above, where arg1 is the analysis pertaining to the latest interval
// followed by arg2.
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
if (analysis.getAggregators() != null) {
for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
final String aggregatorName = entry.getKey();
final AggregatorFactory aggregator = entry.getValue();
aggregators.putIfAbsent(aggregatorName, aggregator);
}
}
}
} else {
throw DruidException.defensive("[%s] merge strategy is not implemented.", aggregatorMergeStrategy);
}
final TimestampSpec timestampSpec = TimestampSpec.mergeTimestampSpec(
@ -369,7 +399,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
if (arg1.getId() != null && arg2.getId() != null && arg1.getId().equals(arg2.getId())) {
mergedId = arg1.getId();
} else {
mergedId = "merged";
mergedId = mergedSegmentId == null ? "merged" : mergedSegmentId.toString();
}
final Boolean rollup;

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.metadata.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.druid.java.util.common.StringUtils;
public enum AggregatorMergeStrategy
{
STRICT,
LENIENT,
LATEST;
@JsonValue
@Override
public String toString()
{
return StringUtils.toLowerCase(this.name());
}
@JsonCreator
public static AggregatorMergeStrategy fromString(String name)
{
return valueOf(StringUtils.toUpperCase(name));
}
}

View File

@ -22,7 +22,7 @@ package org.apache.druid.query.metadata.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.Cacheable;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@ -82,7 +82,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
private final boolean merge;
private final boolean usingDefaultInterval;
private final EnumSet<AnalysisType> analysisTypes;
private final boolean lenientAggregatorMerge;
private final AggregatorMergeStrategy aggregatorMergeStrategy;
@JsonCreator
public SegmentMetadataQuery(
@ -93,7 +93,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("analysisTypes") EnumSet<AnalysisType> analysisTypes,
@JsonProperty("usingDefaultInterval") Boolean useDefaultInterval,
@JsonProperty("lenientAggregatorMerge") Boolean lenientAggregatorMerge
@Deprecated @JsonProperty("lenientAggregatorMerge") Boolean lenientAggregatorMerge,
@JsonProperty("aggregatorMergeStrategy") AggregatorMergeStrategy aggregatorMergeStrategy
)
{
super(dataSource, querySegmentSpec == null ? DEFAULT_SEGMENT_SPEC : querySegmentSpec, false, context);
@ -106,11 +107,28 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude;
this.merge = merge == null ? false : merge;
this.analysisTypes = analysisTypes;
Preconditions.checkArgument(
dataSource instanceof TableDataSource || dataSource instanceof UnionDataSource,
"SegmentMetadataQuery only supports table or union datasource"
);
this.lenientAggregatorMerge = lenientAggregatorMerge == null ? false : lenientAggregatorMerge;
if (!(dataSource instanceof TableDataSource || dataSource instanceof UnionDataSource)) {
throw InvalidInput.exception("Invalid dataSource type [%s]. "
+ "SegmentMetadataQuery only supports table or union datasources.", dataSource);
}
// We validate that there's only one parameter specified by the user. While the deprecated property is still
// supported in the API, we only set the new member variable either using old or new property, so we've a single source
// of truth for consumers of this class variable. The defaults are to preserve backwards compatibility.
// In a future release, 28.0+, we can remove the deprecated property lenientAggregatorMerge.
if (lenientAggregatorMerge != null && aggregatorMergeStrategy != null) {
throw InvalidInput.exception("Both lenientAggregatorMerge [%s] and aggregatorMergeStrategy [%s] parameters cannot be set."
+ " Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated.",
lenientAggregatorMerge, aggregatorMergeStrategy);
}
if (lenientAggregatorMerge != null) {
this.aggregatorMergeStrategy = lenientAggregatorMerge
? AggregatorMergeStrategy.LENIENT
: AggregatorMergeStrategy.STRICT;
} else if (aggregatorMergeStrategy != null) {
this.aggregatorMergeStrategy = aggregatorMergeStrategy;
} else {
this.aggregatorMergeStrategy = AggregatorMergeStrategy.STRICT;
}
}
@JsonProperty
@ -156,9 +174,9 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
}
@JsonProperty
public boolean isLenientAggregatorMerge()
public AggregatorMergeStrategy getAggregatorMergeStrategy()
{
return lenientAggregatorMerge;
return aggregatorMergeStrategy;
}
public boolean analyzingInterval()
@ -237,7 +255,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
", merge=" + merge +
", usingDefaultInterval=" + usingDefaultInterval +
", analysisTypes=" + analysisTypes +
", lenientAggregatorMerge=" + lenientAggregatorMerge +
", aggregatorMergeStrategy=" + aggregatorMergeStrategy +
'}';
}
@ -256,9 +274,9 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
SegmentMetadataQuery that = (SegmentMetadataQuery) o;
return merge == that.merge &&
usingDefaultInterval == that.usingDefaultInterval &&
lenientAggregatorMerge == that.lenientAggregatorMerge &&
Objects.equals(toInclude, that.toInclude) &&
Objects.equals(analysisTypes, that.analysisTypes);
Objects.equals(analysisTypes, that.analysisTypes) &&
Objects.equals(aggregatorMergeStrategy, that.aggregatorMergeStrategy);
}
@Override
@ -270,7 +288,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
merge,
usingDefaultInterval,
analysisTypes,
lenientAggregatorMerge
aggregatorMergeStrategy
);
}
}

View File

@ -231,6 +231,16 @@ public final class SegmentId implements Comparable<SegmentId>
}
}
/**
* Creates a merged SegmentId for the given data source, interval and partition number. Used when segments are
* merged.
*/
public static SegmentId merged(String dataSource, Interval interval, int partitionNum)
{
return of(dataSource, interval, "merged", partitionNum);
}
/**
* Creates a dummy SegmentId with the given data source. This method is useful in benchmark and test code.
*/

View File

@ -249,7 +249,15 @@ public class SegmentAnalyzerTest extends InitializedNullHandlingTest
);
final SegmentMetadataQuery query = new SegmentMetadataQuery(
new TableDataSource("test"), new LegacySegmentSpec("2011/2012"), null, null, null, analyses, false, false
new TableDataSource("test"),
new LegacySegmentSpec("2011/2012"),
null,
null,
null,
analyses,
false,
null,
null
);
return runner.run(QueryPlus.wrap(query)).toList();
}

View File

@ -23,9 +23,13 @@ package org.apache.druid.query.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -33,6 +37,7 @@ import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
@ -40,11 +45,14 @@ import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.timeline.LogicalSegment;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@ -53,18 +61,23 @@ import java.util.stream.Collectors;
public class SegmentMetadataQueryQueryToolChestTest
{
private static final DataSource TEST_DATASOURCE = new TableDataSource("dummy");
private static final SegmentId TEST_SEGMENT_ID1 = SegmentId.of(TEST_DATASOURCE.toString(), Intervals.of("2020-01-01/2020-01-02"), "test", 0);
private static final SegmentId TEST_SEGMENT_ID2 = SegmentId.of(TEST_DATASOURCE.toString(), Intervals.of("2021-01-01/2021-01-02"), "test", 0);
@Test
public void testCacheStrategy() throws Exception
{
SegmentMetadataQuery query = new SegmentMetadataQuery(
new TableDataSource("dummy"),
TEST_DATASOURCE,
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
false
null,
AggregatorMergeStrategy.STRICT
);
CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> strategy =
@ -76,7 +89,7 @@ public class SegmentMetadataQueryQueryToolChestTest
Assert.assertArrayEquals(expectedKey, actualKey);
SegmentAnalysis result = new SegmentAnalysis(
"testSegment",
TEST_SEGMENT_ID1.toString(),
ImmutableList.of(Intervals.of("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")),
new LinkedHashMap<>(
ImmutableMap.of(
@ -119,7 +132,7 @@ public class SegmentMetadataQueryQueryToolChestTest
public void testMergeAggregators()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID1.toString(),
null,
new LinkedHashMap<>(),
0,
@ -133,7 +146,7 @@ public class SegmentMetadataQueryQueryToolChestTest
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID2.toString(),
null,
new LinkedHashMap<>(),
0,
@ -148,20 +161,154 @@ public class SegmentMetadataQueryQueryToolChestTest
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
mergeStrict(analysis1, analysis2).getAggregators()
null,
null,
null
),
mergeStrict(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLenient(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLatest(analysis1, analysis2)
);
}
@Test
public void testMergeAggregatorsWithIntervals()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
TEST_SEGMENT_ID1.toString(),
ImmutableList.of(TEST_SEGMENT_ID1.getInterval()),
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
mergeLenient(analysis1, analysis2).getAggregators()
null,
null,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
TEST_SEGMENT_ID2.toString(),
ImmutableList.of(TEST_SEGMENT_ID2.getInterval()),
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
),
null,
null,
null
);
final List<Interval> expectedIntervals = new ArrayList<>();
expectedIntervals.addAll(analysis1.getIntervals());
expectedIntervals.addAll(analysis2.getIntervals());
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
expectedIntervals,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeStrict(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
expectedIntervals,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLenient(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
expectedIntervals,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLatest(analysis1, analysis2)
);
}
@ -169,7 +316,7 @@ public class SegmentMetadataQueryQueryToolChestTest
public void testMergeAggregatorsOneNull()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID1.toString(),
null,
new LinkedHashMap<>(),
0,
@ -180,7 +327,7 @@ public class SegmentMetadataQueryQueryToolChestTest
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID2.toString(),
null,
new LinkedHashMap<>(),
0,
@ -194,13 +341,55 @@ public class SegmentMetadataQueryQueryToolChestTest
null
);
Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators());
Assert.assertEquals(
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
null,
null,
null,
null
),
mergeLenient(analysis1, analysis2).getAggregators()
mergeStrict(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
),
null,
null,
null
),
mergeLenient(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
),
null,
null,
null
),
mergeLatest(analysis1, analysis2)
);
}
@ -208,7 +397,7 @@ public class SegmentMetadataQueryQueryToolChestTest
public void testMergeAggregatorsAllNull()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID1.toString(),
null,
new LinkedHashMap<>(),
0,
@ -219,7 +408,7 @@ public class SegmentMetadataQueryQueryToolChestTest
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID2.toString(),
null,
new LinkedHashMap<>(),
0,
@ -230,15 +419,57 @@ public class SegmentMetadataQueryQueryToolChestTest
null
);
Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators());
Assert.assertNull(mergeLenient(analysis1, analysis2).getAggregators());
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
null,
null,
null,
null
),
mergeStrict(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
null,
null,
null,
null
),
mergeLenient(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
null,
null,
null,
null
),
mergeLatest(analysis1, analysis2)
);
}
@Test
public void testMergeAggregatorsConflict()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID1.toString(),
null,
new LinkedHashMap<>(),
0,
@ -252,7 +483,7 @@ public class SegmentMetadataQueryQueryToolChestTest
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID2.toString(),
null,
new LinkedHashMap<>(),
0,
@ -271,16 +502,361 @@ public class SegmentMetadataQueryQueryToolChestTest
expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo"));
expectedLenient.put("bar", null);
expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz"));
Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators());
Assert.assertEquals(expectedLenient, mergeLenient(analysis1, analysis2).getAggregators());
// Simulate multi-level merge
Assert.assertEquals(
expectedLenient,
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
null,
null,
null,
null
),
mergeStrict(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
expectedLenient,
null,
null,
null
),
mergeLenient(analysis1, analysis2)
);
// Simulate multi-level lenient merge
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
expectedLenient,
null,
null,
null
),
mergeLenient(
mergeLenient(analysis1, analysis2),
mergeLenient(analysis1, analysis2)
).getAggregators()
)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleMaxAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLatest(analysis1, analysis2)
);
// Simulate multi-level lenient merge
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleMaxAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLatest(
mergeLatest(analysis1, analysis2),
mergeLatest(analysis1, analysis2)
)
);
}
@Test
public void testMergeAggregatorsConflictWithDifferentOrder()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
TEST_SEGMENT_ID2.toString(),
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
),
null,
null,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
TEST_SEGMENT_ID1.toString(),
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleMaxAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz")
),
null,
null,
null
);
final Map<String, AggregatorFactory> expectedLenient = new HashMap<>();
expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo"));
expectedLenient.put("bar", null);
expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz"));
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
null,
null,
null,
null
),
mergeStrict(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
expectedLenient,
null,
null,
null
),
mergeLenient(analysis1, analysis2)
);
// Simulate multi-level lenient merge
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
expectedLenient,
null,
null,
null
),
mergeLenient(
mergeLenient(analysis1, analysis2),
mergeLenient(analysis1, analysis2)
)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLatest(analysis1, analysis2)
);
// Simulate multi-level lenient merge
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLatest(
mergeLatest(analysis1, analysis2),
mergeLatest(analysis1, analysis2)
)
);
}
@Test
public void testMergeAggregatorsConflictWithEqualSegmentIntervalsAndDifferentPartitions()
{
final SegmentId segmentId1 = SegmentId.of(TEST_DATASOURCE.toString(), Intervals.of("2023-01-01/2023-01-02"), "test", 1);
final SegmentId segmentId2 = SegmentId.of(TEST_DATASOURCE.toString(), Intervals.of("2023-01-01/2023-01-02"), "test", 2);
final SegmentAnalysis analysis1 = new SegmentAnalysis(
segmentId1.toString(),
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
),
null,
null,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
segmentId2.toString(),
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleMaxAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz")
),
null,
null,
null
);
final Map<String, AggregatorFactory> expectedLenient = new HashMap<>();
expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo"));
expectedLenient.put("bar", null);
expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz"));
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2",
null,
new LinkedHashMap<>(),
0,
0,
null,
null,
null,
null
),
mergeStrict(analysis1, analysis2)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2",
null,
new LinkedHashMap<>(),
0,
0,
expectedLenient,
null,
null,
null
),
mergeLenient(analysis1, analysis2)
);
// Simulate multi-level lenient merge
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2",
null,
new LinkedHashMap<>(),
0,
0,
expectedLenient,
null,
null,
null
),
mergeLenient(
mergeLenient(analysis1, analysis2),
mergeLenient(analysis1, analysis2)
)
);
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleMaxAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLatest(analysis1, analysis2)
);
// Simulate multi-level lenient merge
Assert.assertEquals(
new SegmentAnalysis(
"dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2",
null,
new LinkedHashMap<>(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleMaxAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz")
),
null,
null,
null
),
mergeLatest(
mergeLatest(analysis1, analysis2),
mergeLatest(analysis1, analysis2)
)
);
}
@ -333,7 +909,7 @@ public class SegmentMetadataQueryQueryToolChestTest
public void testMergeRollup()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID1.toString(),
null,
new LinkedHashMap<>(),
0,
@ -344,7 +920,7 @@ public class SegmentMetadataQueryQueryToolChestTest
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID2.toString(),
null,
new LinkedHashMap<>(),
0,
@ -355,7 +931,7 @@ public class SegmentMetadataQueryQueryToolChestTest
false
);
final SegmentAnalysis analysis3 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID1.toString(),
null,
new LinkedHashMap<>(),
0,
@ -366,7 +942,7 @@ public class SegmentMetadataQueryQueryToolChestTest
false
);
final SegmentAnalysis analysis4 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID2.toString(),
null,
new LinkedHashMap<>(),
0,
@ -377,7 +953,7 @@ public class SegmentMetadataQueryQueryToolChestTest
true
);
final SegmentAnalysis analysis5 = new SegmentAnalysis(
"id",
TEST_SEGMENT_ID1.toString(),
null,
new LinkedHashMap<>(),
0,
@ -393,16 +969,87 @@ public class SegmentMetadataQueryQueryToolChestTest
Assert.assertNull(mergeStrict(analysis2, analysis4).isRollup());
Assert.assertFalse(mergeStrict(analysis2, analysis3).isRollup());
Assert.assertTrue(mergeStrict(analysis4, analysis5).isRollup());
Assert.assertNull(mergeLenient(analysis1, analysis2).isRollup());
Assert.assertNull(mergeLenient(analysis1, analysis4).isRollup());
Assert.assertNull(mergeLenient(analysis2, analysis4).isRollup());
Assert.assertFalse(mergeLenient(analysis2, analysis3).isRollup());
Assert.assertTrue(mergeLenient(analysis4, analysis5).isRollup());
Assert.assertNull(mergeLatest(analysis1, analysis2).isRollup());
Assert.assertNull(mergeLatest(analysis1, analysis4).isRollup());
Assert.assertNull(mergeLatest(analysis2, analysis4).isRollup());
Assert.assertFalse(mergeLatest(analysis2, analysis3).isRollup());
Assert.assertTrue(mergeLatest(analysis4, analysis5).isRollup());
}
@Test
public void testInvalidMergeAggregatorsWithNullOrEmptyDatasource()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
TEST_SEGMENT_ID1.toString(),
null,
new LinkedHashMap<>(),
0,
0,
null,
null,
null,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
TEST_SEGMENT_ID2.toString(),
null,
new LinkedHashMap<>(),
0,
0,
null,
null,
null,
false
);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> SegmentMetadataQueryQueryToolChest.mergeAnalyses(
null,
analysis1,
analysis2,
AggregatorMergeStrategy.STRICT
)
),
DruidExceptionMatcher
.invalidInput()
.expectMessageIs(
"SegementMetadata queries require at least one datasource.")
);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> SegmentMetadataQueryQueryToolChest.mergeAnalyses(
ImmutableSet.of(),
analysis1,
analysis2,
AggregatorMergeStrategy.STRICT
)
),
DruidExceptionMatcher
.invalidInput()
.expectMessageIs(
"SegementMetadata queries require at least one datasource.")
);
}
private static SegmentAnalysis mergeStrict(SegmentAnalysis analysis1, SegmentAnalysis analysis2)
{
return SegmentMetadataQueryQueryToolChest.finalizeAnalysis(
SegmentMetadataQueryQueryToolChest.mergeAnalyses(
null,
TEST_DATASOURCE.getTableNames(),
analysis1,
analysis2,
false
AggregatorMergeStrategy.STRICT
)
);
}
@ -411,10 +1058,22 @@ public class SegmentMetadataQueryQueryToolChestTest
{
return SegmentMetadataQueryQueryToolChest.finalizeAnalysis(
SegmentMetadataQueryQueryToolChest.mergeAnalyses(
null,
TEST_DATASOURCE.getTableNames(),
analysis1,
analysis2,
true
AggregatorMergeStrategy.LENIENT
)
);
}
private static SegmentAnalysis mergeLatest(SegmentAnalysis analysis1, SegmentAnalysis analysis2)
{
return SegmentMetadataQueryQueryToolChest.finalizeAnalysis(
SegmentMetadataQueryQueryToolChest.mergeAnalyses(
TEST_DATASOURCE.getTableNames(),
analysis1,
analysis2,
AggregatorMergeStrategy.LATEST
)
);
}

View File

@ -20,20 +20,27 @@
package org.apache.druid.query.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentResultValue;
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
@ -42,22 +49,28 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.ListColumnIncluderator;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.LogicalSegment;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -793,7 +806,74 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Collections.singletonList("placement")))
.analysisTypes(SegmentMetadataQuery.AnalysisType.AGGREGATORS)
.merge(true) // if the aggregator strategy is unsepcified, it defaults to strict.
.build();
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
myRunner.run(QueryPlus.wrap(query)),
"failed SegmentMetadata merging query"
);
exec.shutdownNow();
}
@Test
public void testSegmentMetadataQueryWithAggregatorsMergeLenientStrategy()
{
final Map<String, AggregatorFactory> expectedAggregators = new HashMap<>();
for (AggregatorFactory agg : TestIndex.METRIC_AGGS) {
expectedAggregators.put(agg.getName(), agg.getCombiningFactory());
}
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
differentIds ? "merged" : SegmentId.dummy("testSegment").toString(),
null,
new LinkedHashMap<>(
ImmutableMap.of(
"placement",
new ColumnAnalysis(
ColumnType.STRING,
ValueType.STRING.toString(),
false,
false,
0,
0,
NullHandling.defaultStringValue(),
NullHandling.defaultStringValue(),
null
)
)
),
0,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
expectedAggregators,
null,
null,
null
);
QueryToolChest toolChest = FACTORY.getToolchest();
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
Execs.directExecutor(),
Lists.newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
)
),
toolChest
);
SegmentMetadataQuery query = Druids
.newSegmentMetadataQueryBuilder()
.dataSource("testing222")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Collections.singletonList("placement")))
.analysisTypes(SegmentMetadataQuery.AnalysisType.AGGREGATORS)
.merge(true)
.aggregatorMergeStrategy(AggregatorMergeStrategy.LENIENT)
.build();
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
@ -990,9 +1070,13 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest
query.getIntervals().get(0)
);
Assert.assertEquals(expectedAnalysisTypes, ((SegmentMetadataQuery) query).getAnalysisTypes());
Assert.assertEquals(AggregatorMergeStrategy.STRICT, ((SegmentMetadataQuery) query).getAggregatorMergeStrategy());
// test serialize and deserialize
Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class));
// test copy
Assert.assertEquals(query, Druids.SegmentMetadataQueryBuilder.copy((SegmentMetadataQuery) query).build());
}
@Test
@ -1004,9 +1088,11 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest
+ "}";
Query query = MAPPER.readValue(queryStr, Query.class);
Assert.assertTrue(query instanceof SegmentMetadataQuery);
Assert.assertTrue(query.getDataSource() instanceof TableDataSource);
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames()));
Assert.assertEquals(Intervals.ETERNITY, query.getIntervals().get(0));
Assert.assertTrue(((SegmentMetadataQuery) query).isUsingDefaultInterval());
Assert.assertEquals(AggregatorMergeStrategy.STRICT, ((SegmentMetadataQuery) query).getAggregatorMergeStrategy());
// test serialize and deserialize
Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class));
@ -1015,6 +1101,51 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest
Assert.assertEquals(query, Druids.SegmentMetadataQueryBuilder.copy((SegmentMetadataQuery) query).build());
}
@Test
public void testSerdeWithLatestAggregatorStrategy() throws Exception
{
String queryStr = "{\n"
+ " \"queryType\":\"segmentMetadata\",\n"
+ " \"dataSource\":\"test_ds\",\n"
+ " \"aggregatorMergeStrategy\":\"latest\"\n"
+ "}";
Query query = MAPPER.readValue(queryStr, Query.class);
Assert.assertTrue(query instanceof SegmentMetadataQuery);
Assert.assertTrue(query.getDataSource() instanceof TableDataSource);
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames()));
Assert.assertEquals(Intervals.ETERNITY, query.getIntervals().get(0));
Assert.assertTrue(((SegmentMetadataQuery) query).isUsingDefaultInterval());
Assert.assertEquals(AggregatorMergeStrategy.LATEST, ((SegmentMetadataQuery) query).getAggregatorMergeStrategy());
// test serialize and deserialize
Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class));
// test copy
Assert.assertEquals(query, Druids.SegmentMetadataQueryBuilder.copy((SegmentMetadataQuery) query).build());
}
@Test
public void testSerdeWithBothDeprecatedAndNewParameters()
{
String queryStr = "{\n"
+ " \"queryType\":\"segmentMetadata\",\n"
+ " \"dataSource\":\"test_ds\",\n"
+ " \"lenientAggregatorMerge\":\"true\",\n"
+ " \"aggregatorMergeStrategy\":\"lenient\"\n"
+ "}";
ValueInstantiationException exception = Assert.assertThrows(
ValueInstantiationException.class,
() -> MAPPER.readValue(queryStr, Query.class)
);
Assert.assertTrue(
exception.getCause().getMessage().contains(
"Both lenientAggregatorMerge [true] and aggregatorMergeStrategy [lenient] parameters cannot be set. Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated."
)
);
}
@Test
public void testDefaultIntervalAndFiltering()
{
@ -1304,7 +1435,6 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest
@Test
public void testAnanlysisTypesBeingSet()
{
SegmentMetadataQuery query1 = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.toInclude(new ListColumnIncluderator(Collections.singletonList("foo")))
@ -1408,4 +1538,253 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest
);
testSegmentMetadataQueryWithDefaultAnalysisMerge("null_column", analysis);
}
@Test
public void testSegmentMetadataQueryWithInvalidDatasourceTypes()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new SegmentMetadataQuery(
InlineDataSource.fromIterable(
ImmutableList.of(new Object[0]),
RowSignature.builder().add("column", ColumnType.STRING).build()
),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
null,
null
)
),
DruidExceptionMatcher
.invalidInput()
.expectMessageIs(
"Invalid dataSource type [InlineDataSource{signature={column:STRING}}]. SegmentMetadataQuery only supports table or union datasources.")
);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new SegmentMetadataQuery(
new LookupDataSource("lookyloo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
null,
null
)
),
DruidExceptionMatcher
.invalidInput()
.expectMessageIs(
"Invalid dataSource type [LookupDataSource{lookupName='lookyloo'}]. SegmentMetadataQuery only supports table or union datasources.")
);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new SegmentMetadataQuery(
JoinDataSource.create(
new TableDataSource("table1"),
new TableDataSource("table2"),
"j.",
"x == \"j.x\"",
JoinType.LEFT,
null,
ExprMacroTable.nil(),
null
),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
null,
null
)
),
DruidExceptionMatcher
.invalidInput()
.expectMessageIs(
"Invalid dataSource type [JoinDataSource{left=table1, right=table2, rightPrefix='j.', condition=x == \"j.x\", joinType=LEFT, leftFilter=null}]. SegmentMetadataQuery only supports table or union datasources.")
);
}
@Test
public void testSegmentMetadataQueryWithAggregatorMergeStrictStrategy()
{
// This is the default behavior -- if nothing is specified, the merge strategy is strict.
Assert.assertEquals(
AggregatorMergeStrategy.STRICT,
new SegmentMetadataQuery(
new TableDataSource("foo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
null,
null
).getAggregatorMergeStrategy()
);
Assert.assertEquals(
AggregatorMergeStrategy.STRICT,
new SegmentMetadataQuery(
new TableDataSource("foo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
false,
null
).getAggregatorMergeStrategy()
);
Assert.assertEquals(
AggregatorMergeStrategy.STRICT,
new SegmentMetadataQuery(
new TableDataSource("foo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
null,
AggregatorMergeStrategy.STRICT
).getAggregatorMergeStrategy()
);
}
@Test
public void testSegmentMetadataQueryWithAggregatorMergeLenientStrategy()
{
Assert.assertEquals(
AggregatorMergeStrategy.LENIENT,
new SegmentMetadataQuery(
new TableDataSource("foo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
true,
null
).getAggregatorMergeStrategy()
);
Assert.assertEquals(
AggregatorMergeStrategy.LENIENT,
new SegmentMetadataQuery(
new TableDataSource("foo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
null,
AggregatorMergeStrategy.LENIENT
).getAggregatorMergeStrategy()
);
}
@Test
public void testSegmentMetadataQueryWithAggregatorMergeLatestStrategy()
{
Assert.assertEquals(
AggregatorMergeStrategy.LATEST,
new SegmentMetadataQuery(
new TableDataSource("foo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
null,
AggregatorMergeStrategy.LATEST
).getAggregatorMergeStrategy()
);
}
@Test
public void testSegmentMetadataQueryWithBothDeprecatedAndNewParameter()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new SegmentMetadataQuery(
new TableDataSource("foo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
false,
AggregatorMergeStrategy.STRICT
)
),
DruidExceptionMatcher.invalidInput()
.expectMessageIs(
"Both lenientAggregatorMerge [false] and aggregatorMergeStrategy [strict] parameters cannot be set."
+ " Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated.")
);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new SegmentMetadataQuery(
new TableDataSource("foo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
true,
AggregatorMergeStrategy.LENIENT
)
),
DruidExceptionMatcher.invalidInput()
.expectMessageIs(
"Both lenientAggregatorMerge [true] and aggregatorMergeStrategy [lenient] parameters cannot be set."
+ " Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated.")
);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new SegmentMetadataQuery(
new TableDataSource("foo"),
new LegacySegmentSpec("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
false,
AggregatorMergeStrategy.LATEST
)
),
DruidExceptionMatcher.invalidInput()
.expectMessageIs(
"Both lenientAggregatorMerge [false] and aggregatorMergeStrategy [latest] parameters cannot be set."
+ " Consider using aggregatorMergeStrategy since lenientAggregatorMerge is deprecated.")
);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.metadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
@ -50,9 +49,6 @@ import java.util.List;
@RunWith(Parameterized.class)
public class SegmentMetadataUnionQueryTest extends InitializedNullHandlingTest
{
static {
NullHandling.initializeForTests();
}
private static final QueryRunnerFactory FACTORY = new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(new SegmentMetadataQueryConfig()),

View File

@ -56,6 +56,7 @@ public class FilteredRequestLoggerTest
null,
null,
null,
null,
null
);

View File

@ -241,7 +241,8 @@ public class DumpSegment extends GuiceRunnable
null,
EnumSet.allOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
null,
null
);
withOutputStream(
new Function<OutputStream, Object>()

View File

@ -932,7 +932,8 @@ public class SegmentMetadataCache
),
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
null,
null // we don't care about merging strategy because merge is false
);
return queryLifecycleFactory

View File

@ -1309,7 +1309,8 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
queryContext,
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
null,
null
);
QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class);

View File

@ -312,7 +312,7 @@ export async function sampleForConnect(
dataSource,
intervals,
merge: true,
lenientAggregatorMerge: true,
aggregatorMergeStrategy: 'lenient',
analysisTypes: ['aggregators', 'rollup'],
});

View File

@ -1799,6 +1799,7 @@ InsensitiveContainsSearchQuerySpec
RegexSearchQuerySpec
analysisType
analysisTypes
aggregatorMergeStrategy
lenientAggregatorMerge
minmax
segmentMetadata