Merge pull request #2295 from gianm/segment-metadata-query-query-metadata

SegmentMetadataQuery support for returning aggregators.
This commit is contained in:
Fangjin Yang 2016-01-21 19:12:46 -08:00
commit f03b65480f
13 changed files with 497 additions and 130 deletions

View File

@ -31,6 +31,7 @@ There are several main parts to a segment metadata query:
|merge|Merge all individual segment metadata results into a single result|no|
|context|See [Context](../querying/query-context.html)|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "size", "interval"]. See section [analysisTypes](#analysistypes) for more details.|no|
|lenientAggregatorMerge|If true, and if the "aggregators" analysisType is enabled, aggregators will be merged leniently. See below for details.|no|
The format of the result is:
@ -44,6 +45,9 @@ The format of the result is:
"dim2" : { "type" : "STRING", "hasMultipleValues" : true, "size" : 100000, "cardinality" : 1504, "errorMessage" : null },
"metric1" : { "type" : "FLOAT", "hasMultipleValues" : false, "size" : 100000, "cardinality" : null, "errorMessage" : null }
},
"aggregators" : {
"metric1" : { "type" : "longSum", "name" : "metric1", "fieldName" : "metric1" }
},
"size" : 300000,
"numRows" : 5000000
} ]
@ -99,18 +103,39 @@ This is a list of properties that determines the amount of information returned
By default, all analysis types will be used. If a property is not needed, omitting it from this list will result in a more efficient query.
There are 3 types of column analyses:
There are four types of column analyses:
#### cardinality
* Estimated floor of cardinality for each column. Only relevant for dimension columns.
* `cardinality` in the result will return the estimated floor of cardinality for each column. Only relevant for
dimension columns.
#### size
* Estimated byte size for the segment columns if they were stored in a flat format
* Estimated total segment byte size in if it was stored in a flat format
* `size` in the result will contain the estimated total segment byte size as if the data were stored in text format
#### interval
* If present, the SegmentMetadataQuery will return the list of intervals associated with the queried segments.
* `intervals` in the result will contain the list of intervals associated with the queried segments.
#### aggregators
* `aggregators` in the result will contain the list of aggregators usable for querying metric columns. This may be
null if the aggregators are unknown or unmergeable (if merging is enabled).
* Merging can be strict or lenient. See *lenientAggregatorMerge* below for details.
* The form of the result is a map of column name to aggregator.
### lenientAggregatorMerge
Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if
two segments use incompatible aggregators for the same column (e.g. longSum changed to doubleSum).
Aggregators can be merged strictly (the default) or leniently. With strict merging, if there are any segments
with unknown aggregators, or any conflicts of any kind, the merged aggregators list will be `null`. With lenient
merging, segments with unknown aggregators will be ignored, and conflicts between aggregators will only null out
the aggregator for that particular column.
In particular, with lenient merging, it is possible for an invidiual column's aggregator to be `null`. This will not
occur with strict merging.

View File

@ -938,6 +938,7 @@ public class Druids
private ColumnIncluderator toInclude;
private EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes;
private Boolean merge;
private Boolean lenientAggregatorMerge;
private Map<String, Object> context;
public SegmentMetadataQueryBuilder()
@ -948,6 +949,7 @@ public class Druids
analysisTypes = null;
merge = null;
context = null;
lenientAggregatorMerge = null;
}
public SegmentMetadataQuery build()
@ -959,7 +961,8 @@ public class Druids
merge,
context,
analysisTypes,
false
false,
lenientAggregatorMerge
);
}
@ -975,6 +978,7 @@ public class Druids
.toInclude(toInclude)
.analysisTypes(analysisTypesArray)
.merge(merge)
.lenientAggregatorMerge(lenientAggregatorMerge)
.context(builder.context);
}
@ -1032,6 +1036,12 @@ public class Druids
return this;
}
public SegmentMetadataQueryBuilder lenientAggregatorMerge(boolean lenientAggregatorMerge)
{
this.lenientAggregatorMerge = lenientAggregatorMerge;
return this;
}
public SegmentMetadataQueryBuilder context(Map<String, Object> c)
{
context = c;

View File

@ -20,9 +20,11 @@
package io.druid.query.metadata;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -41,6 +43,8 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentAnalysis;
@ -51,7 +55,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -67,13 +71,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
@Override
public SegmentAnalysis apply(SegmentAnalysis analysis)
{
return new SegmentAnalysis(
analysis.getId(),
analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null,
analysis.getColumns(),
analysis.getSize(),
analysis.getNumRows()
);
return finalizeAnalysis(analysis);
}
};
@ -139,44 +137,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
@Override
public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
{
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
List<Interval> newIntervals = null;
if (query.analyzingInterval()) {
//List returned by arg1.getIntervals() is immutable, so a new list needs to
//be created.
newIntervals = new ArrayList<>(arg1.getIntervals());
newIntervals.addAll(arg2.getIntervals());
}
final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
final String columnName = entry.getKey();
columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
rightColumnNames.remove(columnName);
}
for (String columnName : rightColumnNames) {
columns.put(columnName, rightColumns.get(columnName));
}
return new SegmentAnalysis(
"merged",
newIntervals,
columns,
arg1.getSize() + arg2.getSize(),
arg1.getNumRows() + arg2.getNumRows()
);
return mergeAnalyses(arg1, arg2, query.isLenientAggregatorMerge());
}
};
}
@ -284,4 +245,110 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
)
);
}
@VisibleForTesting
public static SegmentAnalysis mergeAnalyses(
final SegmentAnalysis arg1,
final SegmentAnalysis arg2,
boolean lenientAggregatorMerge
)
{
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
List<Interval> newIntervals = null;
if (arg1.getIntervals() != null) {
newIntervals = Lists.newArrayList();
newIntervals.addAll(arg1.getIntervals());
}
if (arg2.getIntervals() != null) {
if (newIntervals == null) {
newIntervals = Lists.newArrayList();
}
newIntervals.addAll(arg2.getIntervals());
}
final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
final String columnName = entry.getKey();
columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
rightColumnNames.remove(columnName);
}
for (String columnName : rightColumnNames) {
columns.put(columnName, rightColumns.get(columnName));
}
final Map<String, AggregatorFactory> aggregators = Maps.newHashMap();
if (lenientAggregatorMerge) {
// Merge each aggregator individually, ignoring nulls
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
if (analysis.getAggregators() != null) {
for (AggregatorFactory aggregator : analysis.getAggregators().values()) {
AggregatorFactory merged = aggregators.get(aggregator.getName());
if (merged != null) {
try {
merged = merged.getMergingFactory(aggregator);
}
catch (AggregatorFactoryNotMergeableException e) {
merged = null;
}
} else {
merged = aggregator;
}
aggregators.put(aggregator.getName(), merged);
}
}
}
} else {
final AggregatorFactory[] aggs1 = arg1.getAggregators() != null
? arg1.getAggregators()
.values()
.toArray(new AggregatorFactory[arg1.getAggregators().size()])
: null;
final AggregatorFactory[] aggs2 = arg2.getAggregators() != null
? arg2.getAggregators()
.values()
.toArray(new AggregatorFactory[arg2.getAggregators().size()])
: null;
final AggregatorFactory[] merged = AggregatorFactory.mergeAggregators(Arrays.asList(aggs1, aggs2));
if (merged != null) {
for (AggregatorFactory aggregator : merged) {
aggregators.put(aggregator.getName(), aggregator);
}
}
}
return new SegmentAnalysis(
"merged",
newIntervals,
columns,
arg1.getSize() + arg2.getSize(),
arg1.getNumRows() + arg2.getNumRows(),
aggregators.isEmpty() ? null : aggregators
);
}
@VisibleForTesting
public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis)
{
return new SegmentAnalysis(
analysis.getId(),
analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null,
analysis.getColumns(),
analysis.getSize(),
analysis.getNumRows(),
analysis.getAggregators()
);
}
}

View File

@ -39,10 +39,12 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.ColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.segment.Metadata;
import io.druid.segment.Segment;
import org.joda.time.Interval;
@ -108,6 +110,21 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
}
List<Interval> retIntervals = query.analyzingInterval() ? Arrays.asList(segment.getDataInterval()) : null;
final Map<String, AggregatorFactory> aggregators;
if (query.hasAggregators()) {
final Metadata metadata = segment.asStorageAdapter().getMetadata();
if (metadata != null && metadata.getAggregators() != null) {
aggregators = Maps.newHashMap();
for (AggregatorFactory aggregator : metadata.getAggregators()) {
aggregators.put(aggregator.getName(), aggregator);
}
} else {
aggregators = null;
}
} else {
aggregators = null;
}
return Sequences.simple(
Arrays.asList(
new SegmentAnalysis(
@ -115,7 +132,8 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
retIntervals,
columns,
totalSize,
numRows
numRows,
aggregators
)
)
);
@ -168,10 +186,10 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");

View File

@ -21,10 +21,12 @@ package io.druid.query.metadata.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class SegmentAnalysis implements Comparable<SegmentAnalysis>
{
@ -33,6 +35,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
private final Map<String, ColumnAnalysis> columns;
private final long size;
private final int numRows;
private final Map<String, AggregatorFactory> aggregators;
@JsonCreator
public SegmentAnalysis(
@ -40,8 +43,8 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
@JsonProperty("intervals") List<Interval> interval,
@JsonProperty("columns") Map<String, ColumnAnalysis> columns,
@JsonProperty("size") long size,
@JsonProperty("numRows") int numRows
@JsonProperty("numRows") int numRows,
@JsonProperty("aggregators") Map<String, AggregatorFactory> aggregators
)
{
this.id = id;
@ -49,6 +52,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
this.columns = columns;
this.size = size;
this.numRows = numRows;
this.aggregators = aggregators;
}
@JsonProperty
@ -81,15 +85,10 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
return numRows;
}
public String toDetailedString()
@JsonProperty
public Map<String, AggregatorFactory> getAggregators()
{
return "SegmentAnalysis{" +
"id='" + id + '\'' +
", interval=" + interval +
", columns=" + columns +
", size=" + size +
", numRows=" + numRows +
'}';
return aggregators;
}
@Override
@ -101,9 +100,13 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
", columns=" + columns +
", size=" + size +
", numRows=" + numRows +
", aggregators=" + aggregators +
'}';
}
/**
* Best-effort equals method; relies on AggregatorFactory.equals, which is not guaranteed to be sanely implemented.
*/
@Override
public boolean equals(Object o)
{
@ -113,36 +116,23 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentAnalysis that = (SegmentAnalysis) o;
if (size != that.size) {
return false;
}
if (numRows != that.numRows) {
return false;
}
if (id != null ? !id.equals(that.id) : that.id != null) {
return false;
}
if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
return false;
}
return !(columns != null ? !columns.equals(that.columns) : that.columns != null);
return size == that.size &&
numRows == that.numRows &&
Objects.equals(id, that.id) &&
Objects.equals(interval, that.interval) &&
Objects.equals(columns, that.columns) &&
Objects.equals(aggregators, that.aggregators);
}
/**
* Best-effort hashCode method; relies on AggregatorFactory.hashCode, which is not guaranteed to be sanely
* implemented.
*/
@Override
public int hashCode()
{
int result = id != null ? id.hashCode() : 0;
result = 31 * result + (interval != null ? interval.hashCode() : 0);
result = 31 * result + (columns != null ? columns.hashCode() : 0);
result = 31 * result + (int) (size ^ (size >>> 32));
result = 31 * result + (int) (numRows ^ (numRows >>> 32));
return result;
return Objects.hash(id, interval, columns, size, numRows, aggregators);
}
@Override

View File

@ -38,6 +38,7 @@ import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
{
@ -51,7 +52,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
{
CARDINALITY,
SIZE,
INTERVAL;
INTERVAL,
AGGREGATORS;
@JsonValue
@Override
@ -86,6 +88,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
private final boolean merge;
private final boolean usingDefaultInterval;
private final EnumSet<AnalysisType> analysisTypes;
private final boolean lenientAggregatorMerge;
@JsonCreator
public SegmentMetadataQuery(
@ -95,7 +98,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
@JsonProperty("merge") Boolean merge,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("analysisTypes") EnumSet<AnalysisType> analysisTypes,
@JsonProperty("usingDefaultInterval") Boolean useDefaultInterval
@JsonProperty("usingDefaultInterval") Boolean useDefaultInterval,
@JsonProperty("lenientAggregatorMerge") Boolean lenientAggregatorMerge
)
{
super(
@ -118,6 +122,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
dataSource instanceof TableDataSource,
"SegmentMetadataQuery only supports table datasource"
);
this.lenientAggregatorMerge = lenientAggregatorMerge == null ? false : lenientAggregatorMerge;
}
@JsonProperty
@ -156,11 +161,22 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
return analysisTypes;
}
@JsonProperty
public boolean isLenientAggregatorMerge()
{
return lenientAggregatorMerge;
}
public boolean analyzingInterval()
{
return analysisTypes.contains(AnalysisType.INTERVAL);
}
public boolean hasAggregators()
{
return analysisTypes.contains(AnalysisType.AGGREGATORS);
}
public byte[] getAnalysisTypesCacheKey()
{
int size = 1;
@ -191,7 +207,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
merge,
computeOverridenContext(contextOverride),
analysisTypes,
usingDefaultInterval
usingDefaultInterval,
lenientAggregatorMerge
);
}
@ -205,7 +222,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
merge,
getContext(),
analysisTypes,
usingDefaultInterval
usingDefaultInterval,
lenientAggregatorMerge
);
}
@ -219,7 +237,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
merge,
getContext(),
analysisTypes,
usingDefaultInterval
usingDefaultInterval,
lenientAggregatorMerge
);
}
@ -233,6 +252,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
", merge=" + merge +
", usingDefaultInterval=" + usingDefaultInterval +
", analysisTypes=" + analysisTypes +
", lenientAggregatorMerge=" + lenientAggregatorMerge +
'}';
}
@ -248,31 +268,24 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
if (!super.equals(o)) {
return false;
}
SegmentMetadataQuery that = (SegmentMetadataQuery) o;
if (merge != that.merge) {
return false;
}
if (usingDefaultInterval != that.usingDefaultInterval) {
return false;
}
if (!analysisTypes.equals(that.analysisTypes)) {
return false;
}
return !(toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null);
return merge == that.merge &&
usingDefaultInterval == that.usingDefaultInterval &&
lenientAggregatorMerge == that.lenientAggregatorMerge &&
Objects.equals(toInclude, that.toInclude) &&
Objects.equals(analysisTypes, that.analysisTypes);
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0);
result = 31 * result + (merge ? 1 : 0);
result = 31 * result + (usingDefaultInterval ? 1 : 0);
result = 31 * result + analysisTypes.hashCode();
return result;
return Objects.hash(
super.hashCode(),
toInclude,
merge,
usingDefaultInterval,
analysisTypes,
lenientAggregatorMerge
);
}
}

View File

@ -899,4 +899,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
@Override
public Metadata getMetadata()
{
return index.getMetadata();
}
}

View File

@ -55,4 +55,5 @@ public interface StorageAdapter extends CursorFactory
public String getColumnTypeName(String column);
public int getNumRows();
public DateTime getMaxIngestedEventTime();
public Metadata getMetadata();
}

View File

@ -41,6 +41,7 @@ import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.Metadata;
import io.druid.segment.NullDimensionSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.SingleScanTimeDimSelector;
@ -734,4 +735,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
};
}
}
@Override
public Metadata getMetadata()
{
return index.getMetadata();
}
}

View File

@ -171,7 +171,7 @@ public class SegmentAnalyzerTest
);
final SegmentMetadataQuery query = new SegmentMetadataQuery(
new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, analyses, false
new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, analyses, false, false
);
HashMap<String, Object> context = new HashMap<String, Object>();
return Sequences.toList(query.run(runner, context), Lists.<SegmentAnalysis>newArrayList());

View File

@ -23,9 +23,15 @@ package io.druid.query.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.CacheStrategy;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
@ -35,19 +41,22 @@ import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class SegmentMetadataQueryQueryToolChestTest
{
@Test
public void testCacheStrategy() throws Exception
{
SegmentMetadataQuery query = new SegmentMetadataQuery(
new TableDataSource("dummy"),
QuerySegmentSpecs.create("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false
new TableDataSource("dummy"),
QuerySegmentSpecs.create("2015-01-01/2015-01-02"),
null,
null,
null,
null,
false,
false
);
CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> strategy =
@ -73,7 +82,8 @@ public class SegmentMetadataQueryQueryToolChestTest
null
)
), 71982,
100
100,
null
);
Object preparedValue = strategy.prepareForCache().apply(result);
@ -88,4 +98,162 @@ public class SegmentMetadataQueryQueryToolChestTest
Assert.assertEquals(result, fromCacheResult);
}
@Test
public void testMergeAggregators()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
)
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
)
);
Assert.assertEquals(
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
mergeStrict(analysis1, analysis2).getAggregators()
);
Assert.assertEquals(
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
mergeLenient(analysis1, analysis2).getAggregators()
);
}
@Test
public void testMergeAggregatorsOneNull()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
)
);
Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators());
Assert.assertEquals(
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
),
mergeLenient(analysis1, analysis2).getAggregators()
);
}
@Test
public void testMergeAggregatorsAllNull()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
null
);
Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators());
Assert.assertNull(mergeLenient(analysis1, analysis2).getAggregators());
}
@Test
public void testMergeAggregatorsConflict()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar")
)
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
ImmutableMap.of(
"foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleMaxAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz")
)
);
final Map<String, AggregatorFactory> expectedLenient = Maps.newHashMap();
expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo"));
expectedLenient.put("bar", null);
expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz"));
Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators());
Assert.assertEquals(expectedLenient, mergeLenient(analysis1, analysis2).getAggregators());
}
private static SegmentAnalysis mergeStrict(SegmentAnalysis analysis1, SegmentAnalysis analysis2)
{
return SegmentMetadataQueryQueryToolChest.finalizeAnalysis(
SegmentMetadataQueryQueryToolChest.mergeAnalyses(
analysis1,
analysis2,
false
)
);
}
private static SegmentAnalysis mergeLenient(SegmentAnalysis analysis1, SegmentAnalysis analysis2)
{
return SegmentMetadataQueryQueryToolChest.finalizeAnalysis(
SegmentMetadataQueryQueryToolChest.mergeAnalyses(
analysis1,
analysis2,
true
)
);
}
}

View File

@ -39,6 +39,7 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.ListColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
@ -59,6 +60,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -150,7 +152,8 @@ public class SegmentMetadataQueryTest
null
)
), usingMmappedSegment ? 71982 : 32643,
1209
1209,
null
);
}
@ -191,7 +194,8 @@ public class SegmentMetadataQueryTest
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2
expectedSegmentAnalysis.getNumRows() * 2,
null
);
QueryToolChest toolChest = FACTORY.getToolchest();
@ -250,7 +254,8 @@ public class SegmentMetadataQueryTest
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2
expectedSegmentAnalysis.getNumRows() * 2,
null
);
QueryToolChest toolChest = FACTORY.getToolchest();
@ -317,7 +322,8 @@ public class SegmentMetadataQueryTest
)
),
expectedSegmentAnalysis.getSize() * 2,
expectedSegmentAnalysis.getNumRows() * 2
expectedSegmentAnalysis.getNumRows() * 2,
null
);
QueryToolChest toolChest = FACTORY.getToolchest();
@ -362,7 +368,8 @@ public class SegmentMetadataQueryTest
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2
expectedSegmentAnalysis.getNumRows() * 2,
null
);
QueryToolChest toolChest = FACTORY.getToolchest();
@ -396,6 +403,62 @@ public class SegmentMetadataQueryTest
exec.shutdownNow();
}
@Test
public void testSegmentMetadataQueryWithAggregatorsMerge()
{
final Map<String, AggregatorFactory> expectedAggregators = Maps.newHashMap();
for (AggregatorFactory agg : TestIndex.METRIC_AGGS) {
expectedAggregators.put(agg.getName(), agg.getCombiningFactory());
}
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
"merged",
null,
ImmutableMap.of(
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
0,
0,
null
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2,
expectedAggregators
);
QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
)
),
toolChest
);
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
myRunner.run(
Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
.analysisTypes(SegmentMetadataQuery.AnalysisType.AGGREGATORS)
.merge(true)
.build(),
Maps.newHashMap()
),
"failed SegmentMetadata merging query"
);
exec.shutdownNow();
}
@Test
public void testBySegmentResults()
{

View File

@ -74,7 +74,7 @@ public class TestIndex
public static final String[] METRICS = new String[]{"index"};
private static final Logger log = new Logger(TestIndex.class);
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
public static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
};