From d416279c141ee24873f4959846789235d9b8517e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 19 Jan 2016 17:05:46 -0800 Subject: [PATCH] SegmentMetadataQuery support for returning aggregators. --- docs/content/querying/segmentmetadataquery.md | 37 +++- .../src/main/java/io/druid/query/Druids.java | 12 +- .../SegmentMetadataQueryQueryToolChest.java | 159 ++++++++++----- .../SegmentMetadataQueryRunnerFactory.java | 24 ++- .../metadata/metadata/SegmentAnalysis.java | 58 +++--- .../metadata/SegmentMetadataQuery.java | 63 +++--- .../segment/QueryableIndexStorageAdapter.java | 5 + .../java/io/druid/segment/StorageAdapter.java | 1 + .../IncrementalIndexStorageAdapter.java | 7 + .../query/metadata/SegmentAnalyzerTest.java | 2 +- ...egmentMetadataQueryQueryToolChestTest.java | 184 +++++++++++++++++- .../metadata/SegmentMetadataQueryTest.java | 73 ++++++- .../test/java/io/druid/segment/TestIndex.java | 2 +- 13 files changed, 497 insertions(+), 130 deletions(-) diff --git a/docs/content/querying/segmentmetadataquery.md b/docs/content/querying/segmentmetadataquery.md index 1c083fdaeef..469da4b47ec 100644 --- a/docs/content/querying/segmentmetadataquery.md +++ b/docs/content/querying/segmentmetadataquery.md @@ -31,6 +31,7 @@ There are several main parts to a segment metadata query: |merge|Merge all individual segment metadata results into a single result|no| |context|See [Context](../querying/query-context.html)|no| |analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "size", "interval"]. See section [analysisTypes](#analysistypes) for more details.|no| +|lenientAggregatorMerge|If true, and if the "aggregators" analysisType is enabled, aggregators will be merged leniently. See below for details.|no| The format of the result is: @@ -44,6 +45,9 @@ The format of the result is: "dim2" : { "type" : "STRING", "hasMultipleValues" : true, "size" : 100000, "cardinality" : 1504, "errorMessage" : null }, "metric1" : { "type" : "FLOAT", "hasMultipleValues" : false, "size" : 100000, "cardinality" : null, "errorMessage" : null } }, + "aggregators" : { + "metric1" : { "type" : "longSum", "name" : "metric1", "fieldName" : "metric1" } + }, "size" : 300000, "numRows" : 5000000 } ] @@ -99,18 +103,39 @@ This is a list of properties that determines the amount of information returned By default, all analysis types will be used. If a property is not needed, omitting it from this list will result in a more efficient query. -There are 3 types of column analyses: +There are four types of column analyses: #### cardinality -* Estimated floor of cardinality for each column. Only relevant for dimension columns. +* `cardinality` in the result will return the estimated floor of cardinality for each column. Only relevant for +dimension columns. #### size -* Estimated byte size for the segment columns if they were stored in a flat format - -* Estimated total segment byte size in if it was stored in a flat format +* `size` in the result will contain the estimated total segment byte size as if the data were stored in text format #### interval -* If present, the SegmentMetadataQuery will return the list of intervals associated with the queried segments. +* `intervals` in the result will contain the list of intervals associated with the queried segments. + +#### aggregators + +* `aggregators` in the result will contain the list of aggregators usable for querying metric columns. This may be +null if the aggregators are unknown or unmergeable (if merging is enabled). + +* Merging can be strict or lenient. See *lenientAggregatorMerge* below for details. + +* The form of the result is a map of column name to aggregator. + +### lenientAggregatorMerge + +Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if +two segments use incompatible aggregators for the same column (e.g. longSum changed to doubleSum). + +Aggregators can be merged strictly (the default) or leniently. With strict merging, if there are any segments +with unknown aggregators, or any conflicts of any kind, the merged aggregators list will be `null`. With lenient +merging, segments with unknown aggregators will be ignored, and conflicts between aggregators will only null out +the aggregator for that particular column. + +In particular, with lenient merging, it is possible for an invidiual column's aggregator to be `null`. This will not +occur with strict merging. diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 24e241ccc4e..d20a6530f3e 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -938,6 +938,7 @@ public class Druids private ColumnIncluderator toInclude; private EnumSet analysisTypes; private Boolean merge; + private Boolean lenientAggregatorMerge; private Map context; public SegmentMetadataQueryBuilder() @@ -948,6 +949,7 @@ public class Druids analysisTypes = null; merge = null; context = null; + lenientAggregatorMerge = null; } public SegmentMetadataQuery build() @@ -959,7 +961,8 @@ public class Druids merge, context, analysisTypes, - false + false, + lenientAggregatorMerge ); } @@ -975,6 +978,7 @@ public class Druids .toInclude(toInclude) .analysisTypes(analysisTypesArray) .merge(merge) + .lenientAggregatorMerge(lenientAggregatorMerge) .context(builder.context); } @@ -1032,6 +1036,12 @@ public class Druids return this; } + public SegmentMetadataQueryBuilder lenientAggregatorMerge(boolean lenientAggregatorMerge) + { + this.lenientAggregatorMerge = lenientAggregatorMerge; + return this; + } + public SegmentMetadataQueryBuilder context(Map c) { context = c; diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index b9eafd1763d..1e640dacb30 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -20,9 +20,11 @@ package io.druid.query.metadata; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,6 +43,8 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.ResultMergeQueryRunner; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; @@ -51,7 +55,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,13 +71,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest newIntervals = null; - if (query.analyzingInterval()) { - //List returned by arg1.getIntervals() is immutable, so a new list needs to - //be created. - newIntervals = new ArrayList<>(arg1.getIntervals()); - newIntervals.addAll(arg2.getIntervals()); - } - - final Map leftColumns = arg1.getColumns(); - final Map rightColumns = arg2.getColumns(); - Map columns = Maps.newTreeMap(); - - Set rightColumnNames = Sets.newHashSet(rightColumns.keySet()); - for (Map.Entry entry : leftColumns.entrySet()) { - final String columnName = entry.getKey(); - columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName))); - rightColumnNames.remove(columnName); - } - - for (String columnName : rightColumnNames) { - columns.put(columnName, rightColumns.get(columnName)); - } - - return new SegmentAnalysis( - "merged", - newIntervals, - columns, - arg1.getSize() + arg2.getSize(), - arg1.getNumRows() + arg2.getNumRows() - ); + return mergeAnalyses(arg1, arg2, query.isLenientAggregatorMerge()); } }; } @@ -284,4 +245,110 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest newIntervals = null; + if (arg1.getIntervals() != null) { + newIntervals = Lists.newArrayList(); + newIntervals.addAll(arg1.getIntervals()); + } + if (arg2.getIntervals() != null) { + if (newIntervals == null) { + newIntervals = Lists.newArrayList(); + } + newIntervals.addAll(arg2.getIntervals()); + } + + final Map leftColumns = arg1.getColumns(); + final Map rightColumns = arg2.getColumns(); + Map columns = Maps.newTreeMap(); + + Set rightColumnNames = Sets.newHashSet(rightColumns.keySet()); + for (Map.Entry entry : leftColumns.entrySet()) { + final String columnName = entry.getKey(); + columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName))); + rightColumnNames.remove(columnName); + } + + for (String columnName : rightColumnNames) { + columns.put(columnName, rightColumns.get(columnName)); + } + + final Map aggregators = Maps.newHashMap(); + + if (lenientAggregatorMerge) { + // Merge each aggregator individually, ignoring nulls + for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) { + if (analysis.getAggregators() != null) { + for (AggregatorFactory aggregator : analysis.getAggregators().values()) { + AggregatorFactory merged = aggregators.get(aggregator.getName()); + if (merged != null) { + try { + merged = merged.getMergingFactory(aggregator); + } + catch (AggregatorFactoryNotMergeableException e) { + merged = null; + } + } else { + merged = aggregator; + } + aggregators.put(aggregator.getName(), merged); + } + } + } + } else { + final AggregatorFactory[] aggs1 = arg1.getAggregators() != null + ? arg1.getAggregators() + .values() + .toArray(new AggregatorFactory[arg1.getAggregators().size()]) + : null; + final AggregatorFactory[] aggs2 = arg2.getAggregators() != null + ? arg2.getAggregators() + .values() + .toArray(new AggregatorFactory[arg2.getAggregators().size()]) + : null; + final AggregatorFactory[] merged = AggregatorFactory.mergeAggregators(Arrays.asList(aggs1, aggs2)); + if (merged != null) { + for (AggregatorFactory aggregator : merged) { + aggregators.put(aggregator.getName(), aggregator); + } + } + } + + return new SegmentAnalysis( + "merged", + newIntervals, + columns, + arg1.getSize() + arg2.getSize(), + arg1.getNumRows() + arg2.getNumRows(), + aggregators.isEmpty() ? null : aggregators + ); + } + + @VisibleForTesting + public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis) + { + return new SegmentAnalysis( + analysis.getId(), + analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null, + analysis.getColumns(), + analysis.getSize(), + analysis.getNumRows(), + analysis.getAggregators() + ); + } } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 0a8f35d87a2..8017446c9a6 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -39,10 +39,12 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; import io.druid.query.QueryWatcher; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.ColumnIncluderator; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.segment.Metadata; import io.druid.segment.Segment; import org.joda.time.Interval; @@ -108,6 +110,21 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory retIntervals = query.analyzingInterval() ? Arrays.asList(segment.getDataInterval()) : null; + final Map aggregators; + if (query.hasAggregators()) { + final Metadata metadata = segment.asStorageAdapter().getMetadata(); + if (metadata != null && metadata.getAggregators() != null) { + aggregators = Maps.newHashMap(); + for (AggregatorFactory aggregator : metadata.getAggregators()) { + aggregators.put(aggregator.getName(), aggregator); + } + } else { + aggregators = null; + } + } else { + aggregators = null; + } + return Sequences.simple( Arrays.asList( new SegmentAnalysis( @@ -115,7 +132,8 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory { @@ -33,6 +35,7 @@ public class SegmentAnalysis implements Comparable private final Map columns; private final long size; private final int numRows; + private final Map aggregators; @JsonCreator public SegmentAnalysis( @@ -40,8 +43,8 @@ public class SegmentAnalysis implements Comparable @JsonProperty("intervals") List interval, @JsonProperty("columns") Map columns, @JsonProperty("size") long size, - @JsonProperty("numRows") int numRows - + @JsonProperty("numRows") int numRows, + @JsonProperty("aggregators") Map aggregators ) { this.id = id; @@ -49,6 +52,7 @@ public class SegmentAnalysis implements Comparable this.columns = columns; this.size = size; this.numRows = numRows; + this.aggregators = aggregators; } @JsonProperty @@ -81,15 +85,10 @@ public class SegmentAnalysis implements Comparable return numRows; } - public String toDetailedString() + @JsonProperty + public Map getAggregators() { - return "SegmentAnalysis{" + - "id='" + id + '\'' + - ", interval=" + interval + - ", columns=" + columns + - ", size=" + size + - ", numRows=" + numRows + - '}'; + return aggregators; } @Override @@ -101,9 +100,13 @@ public class SegmentAnalysis implements Comparable ", columns=" + columns + ", size=" + size + ", numRows=" + numRows + + ", aggregators=" + aggregators + '}'; } + /** + * Best-effort equals method; relies on AggregatorFactory.equals, which is not guaranteed to be sanely implemented. + */ @Override public boolean equals(Object o) { @@ -113,36 +116,23 @@ public class SegmentAnalysis implements Comparable if (o == null || getClass() != o.getClass()) { return false; } - SegmentAnalysis that = (SegmentAnalysis) o; - - if (size != that.size) { - return false; - } - - if (numRows != that.numRows) { - return false; - } - - if (id != null ? !id.equals(that.id) : that.id != null) { - return false; - } - if (interval != null ? !interval.equals(that.interval) : that.interval != null) { - return false; - } - return !(columns != null ? !columns.equals(that.columns) : that.columns != null); - + return size == that.size && + numRows == that.numRows && + Objects.equals(id, that.id) && + Objects.equals(interval, that.interval) && + Objects.equals(columns, that.columns) && + Objects.equals(aggregators, that.aggregators); } + /** + * Best-effort hashCode method; relies on AggregatorFactory.hashCode, which is not guaranteed to be sanely + * implemented. + */ @Override public int hashCode() { - int result = id != null ? id.hashCode() : 0; - result = 31 * result + (interval != null ? interval.hashCode() : 0); - result = 31 * result + (columns != null ? columns.hashCode() : 0); - result = 31 * result + (int) (size ^ (size >>> 32)); - result = 31 * result + (int) (numRows ^ (numRows >>> 32)); - return result; + return Objects.hash(id, interval, columns, size, numRows, aggregators); } @Override diff --git a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java index 203a8aac755..3b270be966c 100644 --- a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java +++ b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Objects; public class SegmentMetadataQuery extends BaseQuery { @@ -51,7 +52,8 @@ public class SegmentMetadataQuery extends BaseQuery { CARDINALITY, SIZE, - INTERVAL; + INTERVAL, + AGGREGATORS; @JsonValue @Override @@ -86,6 +88,7 @@ public class SegmentMetadataQuery extends BaseQuery private final boolean merge; private final boolean usingDefaultInterval; private final EnumSet analysisTypes; + private final boolean lenientAggregatorMerge; @JsonCreator public SegmentMetadataQuery( @@ -95,7 +98,8 @@ public class SegmentMetadataQuery extends BaseQuery @JsonProperty("merge") Boolean merge, @JsonProperty("context") Map context, @JsonProperty("analysisTypes") EnumSet analysisTypes, - @JsonProperty("usingDefaultInterval") Boolean useDefaultInterval + @JsonProperty("usingDefaultInterval") Boolean useDefaultInterval, + @JsonProperty("lenientAggregatorMerge") Boolean lenientAggregatorMerge ) { super( @@ -118,6 +122,7 @@ public class SegmentMetadataQuery extends BaseQuery dataSource instanceof TableDataSource, "SegmentMetadataQuery only supports table datasource" ); + this.lenientAggregatorMerge = lenientAggregatorMerge == null ? false : lenientAggregatorMerge; } @JsonProperty @@ -156,11 +161,22 @@ public class SegmentMetadataQuery extends BaseQuery return analysisTypes; } + @JsonProperty + public boolean isLenientAggregatorMerge() + { + return lenientAggregatorMerge; + } + public boolean analyzingInterval() { return analysisTypes.contains(AnalysisType.INTERVAL); } + public boolean hasAggregators() + { + return analysisTypes.contains(AnalysisType.AGGREGATORS); + } + public byte[] getAnalysisTypesCacheKey() { int size = 1; @@ -191,7 +207,8 @@ public class SegmentMetadataQuery extends BaseQuery merge, computeOverridenContext(contextOverride), analysisTypes, - usingDefaultInterval + usingDefaultInterval, + lenientAggregatorMerge ); } @@ -205,7 +222,8 @@ public class SegmentMetadataQuery extends BaseQuery merge, getContext(), analysisTypes, - usingDefaultInterval + usingDefaultInterval, + lenientAggregatorMerge ); } @@ -219,7 +237,8 @@ public class SegmentMetadataQuery extends BaseQuery merge, getContext(), analysisTypes, - usingDefaultInterval + usingDefaultInterval, + lenientAggregatorMerge ); } @@ -233,6 +252,7 @@ public class SegmentMetadataQuery extends BaseQuery ", merge=" + merge + ", usingDefaultInterval=" + usingDefaultInterval + ", analysisTypes=" + analysisTypes + + ", lenientAggregatorMerge=" + lenientAggregatorMerge + '}'; } @@ -248,31 +268,24 @@ public class SegmentMetadataQuery extends BaseQuery if (!super.equals(o)) { return false; } - SegmentMetadataQuery that = (SegmentMetadataQuery) o; - - if (merge != that.merge) { - return false; - } - if (usingDefaultInterval != that.usingDefaultInterval) { - return false; - } - - if (!analysisTypes.equals(that.analysisTypes)) { - return false; - } - return !(toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null); - + return merge == that.merge && + usingDefaultInterval == that.usingDefaultInterval && + lenientAggregatorMerge == that.lenientAggregatorMerge && + Objects.equals(toInclude, that.toInclude) && + Objects.equals(analysisTypes, that.analysisTypes); } @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0); - result = 31 * result + (merge ? 1 : 0); - result = 31 * result + (usingDefaultInterval ? 1 : 0); - result = 31 * result + analysisTypes.hashCode(); - return result; + return Objects.hash( + super.hashCode(), + toInclude, + merge, + usingDefaultInterval, + analysisTypes, + lenientAggregatorMerge + ); } } diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index b95764dceb9..c9eba3c4203 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -899,4 +899,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } + @Override + public Metadata getMetadata() + { + return index.getMetadata(); + } } diff --git a/processing/src/main/java/io/druid/segment/StorageAdapter.java b/processing/src/main/java/io/druid/segment/StorageAdapter.java index 5df2e6c477b..b557757dffa 100644 --- a/processing/src/main/java/io/druid/segment/StorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/StorageAdapter.java @@ -55,4 +55,5 @@ public interface StorageAdapter extends CursorFactory public String getColumnTypeName(String column); public int getNumRows(); public DateTime getMaxIngestedEventTime(); + public Metadata getMetadata(); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 0fe739d50e6..ce9bc7f45d7 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -41,6 +41,7 @@ import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; import io.druid.segment.FloatColumnSelector; import io.druid.segment.LongColumnSelector; +import io.druid.segment.Metadata; import io.druid.segment.NullDimensionSelector; import io.druid.segment.ObjectColumnSelector; import io.druid.segment.SingleScanTimeDimSelector; @@ -734,4 +735,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter }; } } + + @Override + public Metadata getMetadata() + { + return index.getMetadata(); + } } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java index 6762a295143..b7beeebeed4 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java @@ -171,7 +171,7 @@ public class SegmentAnalyzerTest ); final SegmentMetadataQuery query = new SegmentMetadataQuery( - new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, analyses, false + new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, analyses, false, false ); HashMap context = new HashMap(); return Sequences.toList(query.run(runner, context), Lists.newArrayList()); diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index 955b3f0436d..b01c94e52f2 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -23,9 +23,15 @@ package io.druid.query.metadata; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.CacheStrategy; import io.druid.query.TableDataSource; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleMaxAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; @@ -35,19 +41,22 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import java.util.Map; + public class SegmentMetadataQueryQueryToolChestTest { @Test public void testCacheStrategy() throws Exception { SegmentMetadataQuery query = new SegmentMetadataQuery( - new TableDataSource("dummy"), - QuerySegmentSpecs.create("2015-01-01/2015-01-02"), - null, - null, - null, - null, - false + new TableDataSource("dummy"), + QuerySegmentSpecs.create("2015-01-01/2015-01-02"), + null, + null, + null, + null, + false, + false ); CacheStrategy strategy = @@ -73,7 +82,8 @@ public class SegmentMetadataQueryQueryToolChestTest null ) ), 71982, - 100 + 100, + null ); Object preparedValue = strategy.prepareForCache().apply(result); @@ -88,4 +98,162 @@ public class SegmentMetadataQueryQueryToolChestTest Assert.assertEquals(result, fromCacheResult); } + + @Test + public void testMergeAggregators() + { + final SegmentAnalysis analysis1 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "baz", new DoubleSumAggregatorFactory("baz", "baz") + ) + ); + final SegmentAnalysis analysis2 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar") + ) + ); + + Assert.assertEquals( + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar"), + "baz", new DoubleSumAggregatorFactory("baz", "baz") + ), + mergeStrict(analysis1, analysis2).getAggregators() + ); + Assert.assertEquals( + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar"), + "baz", new DoubleSumAggregatorFactory("baz", "baz") + ), + mergeLenient(analysis1, analysis2).getAggregators() + ); + } + + @Test + public void testMergeAggregatorsOneNull() + { + final SegmentAnalysis analysis1 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + null + ); + final SegmentAnalysis analysis2 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar") + ) + ); + + Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators()); + Assert.assertEquals( + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar") + ), + mergeLenient(analysis1, analysis2).getAggregators() + ); + } + + @Test + public void testMergeAggregatorsAllNull() + { + final SegmentAnalysis analysis1 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + null + ); + final SegmentAnalysis analysis2 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + null + ); + + Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators()); + Assert.assertNull(mergeLenient(analysis1, analysis2).getAggregators()); + } + + @Test + public void testMergeAggregatorsConflict() + { + final SegmentAnalysis analysis1 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleSumAggregatorFactory("bar", "bar") + ) + ); + final SegmentAnalysis analysis2 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + ImmutableMap.of( + "foo", new LongSumAggregatorFactory("foo", "foo"), + "bar", new DoubleMaxAggregatorFactory("bar", "bar"), + "baz", new LongMaxAggregatorFactory("baz", "baz") + ) + ); + + final Map expectedLenient = Maps.newHashMap(); + expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo")); + expectedLenient.put("bar", null); + expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz")); + Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators()); + Assert.assertEquals(expectedLenient, mergeLenient(analysis1, analysis2).getAggregators()); + } + + private static SegmentAnalysis mergeStrict(SegmentAnalysis analysis1, SegmentAnalysis analysis2) + { + return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( + SegmentMetadataQueryQueryToolChest.mergeAnalyses( + analysis1, + analysis2, + false + ) + ); + } + + private static SegmentAnalysis mergeLenient(SegmentAnalysis analysis1, SegmentAnalysis analysis2) + { + return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( + SegmentMetadataQueryQueryToolChest.mergeAnalyses( + analysis1, + analysis2, + true + ) + ); + } } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index eae34879215..a466ea63dad 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -39,6 +39,7 @@ import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; import io.druid.query.Result; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.ListColumnIncluderator; import io.druid.query.metadata.metadata.SegmentAnalysis; @@ -59,6 +60,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -150,7 +152,8 @@ public class SegmentMetadataQueryTest null ) ), usingMmappedSegment ? 71982 : 32643, - 1209 + 1209, + null ); } @@ -191,7 +194,8 @@ public class SegmentMetadataQueryTest ) ), 0, - expectedSegmentAnalysis.getNumRows() * 2 + expectedSegmentAnalysis.getNumRows() * 2, + null ); QueryToolChest toolChest = FACTORY.getToolchest(); @@ -250,7 +254,8 @@ public class SegmentMetadataQueryTest ) ), 0, - expectedSegmentAnalysis.getNumRows() * 2 + expectedSegmentAnalysis.getNumRows() * 2, + null ); QueryToolChest toolChest = FACTORY.getToolchest(); @@ -317,7 +322,8 @@ public class SegmentMetadataQueryTest ) ), expectedSegmentAnalysis.getSize() * 2, - expectedSegmentAnalysis.getNumRows() * 2 + expectedSegmentAnalysis.getNumRows() * 2, + null ); QueryToolChest toolChest = FACTORY.getToolchest(); @@ -362,7 +368,8 @@ public class SegmentMetadataQueryTest ) ), 0, - expectedSegmentAnalysis.getNumRows() * 2 + expectedSegmentAnalysis.getNumRows() * 2, + null ); QueryToolChest toolChest = FACTORY.getToolchest(); @@ -396,6 +403,62 @@ public class SegmentMetadataQueryTest exec.shutdownNow(); } + @Test + public void testSegmentMetadataQueryWithAggregatorsMerge() + { + final Map expectedAggregators = Maps.newHashMap(); + for (AggregatorFactory agg : TestIndex.METRIC_AGGS) { + expectedAggregators.put(agg.getName(), agg.getCombiningFactory()); + } + SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis( + "merged", + null, + ImmutableMap.of( + "placement", + new ColumnAnalysis( + ValueType.STRING.toString(), + false, + 0, + 0, + null + ) + ), + 0, + expectedSegmentAnalysis.getNumRows() * 2, + expectedAggregators + ); + + QueryToolChest toolChest = FACTORY.getToolchest(); + + QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner); + ExecutorService exec = Executors.newCachedThreadPool(); + QueryRunner myRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + FACTORY.mergeRunners( + MoreExecutors.sameThreadExecutor(), + Lists.>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner) + ) + ), + toolChest + ); + + TestHelper.assertExpectedObjects( + ImmutableList.of(mergedSegmentAnalysis), + myRunner.run( + Druids.newSegmentMetadataQueryBuilder() + .dataSource("testing") + .intervals("2013/2014") + .toInclude(new ListColumnIncluderator(Arrays.asList("placement"))) + .analysisTypes(SegmentMetadataQuery.AnalysisType.AGGREGATORS) + .merge(true) + .build(), + Maps.newHashMap() + ), + "failed SegmentMetadata merging query" + ); + exec.shutdownNow(); + } + @Test public void testBySegmentResults() { diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 580307106be..8014e999430 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -74,7 +74,7 @@ public class TestIndex public static final String[] METRICS = new String[]{"index"}; private static final Logger log = new Logger(TestIndex.class); private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"); - private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{ + public static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{ new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]), new HyperUniquesAggregatorFactory("quality_uniques", "quality") };