mirror of https://github.com/apache/druid.git
SegmentMetadataQuery support for returning aggregators.
This commit is contained in:
parent
5a9cd89059
commit
d416279c14
|
@ -31,6 +31,7 @@ There are several main parts to a segment metadata query:
|
|||
|merge|Merge all individual segment metadata results into a single result|no|
|
||||
|context|See [Context](../querying/query-context.html)|no|
|
||||
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "size", "interval"]. See section [analysisTypes](#analysistypes) for more details.|no|
|
||||
|lenientAggregatorMerge|If true, and if the "aggregators" analysisType is enabled, aggregators will be merged leniently. See below for details.|no|
|
||||
|
||||
The format of the result is:
|
||||
|
||||
|
@ -44,6 +45,9 @@ The format of the result is:
|
|||
"dim2" : { "type" : "STRING", "hasMultipleValues" : true, "size" : 100000, "cardinality" : 1504, "errorMessage" : null },
|
||||
"metric1" : { "type" : "FLOAT", "hasMultipleValues" : false, "size" : 100000, "cardinality" : null, "errorMessage" : null }
|
||||
},
|
||||
"aggregators" : {
|
||||
"metric1" : { "type" : "longSum", "name" : "metric1", "fieldName" : "metric1" }
|
||||
},
|
||||
"size" : 300000,
|
||||
"numRows" : 5000000
|
||||
} ]
|
||||
|
@ -99,18 +103,39 @@ This is a list of properties that determines the amount of information returned
|
|||
|
||||
By default, all analysis types will be used. If a property is not needed, omitting it from this list will result in a more efficient query.
|
||||
|
||||
There are 3 types of column analyses:
|
||||
There are four types of column analyses:
|
||||
|
||||
#### cardinality
|
||||
|
||||
* Estimated floor of cardinality for each column. Only relevant for dimension columns.
|
||||
* `cardinality` in the result will return the estimated floor of cardinality for each column. Only relevant for
|
||||
dimension columns.
|
||||
|
||||
#### size
|
||||
|
||||
* Estimated byte size for the segment columns if they were stored in a flat format
|
||||
|
||||
* Estimated total segment byte size in if it was stored in a flat format
|
||||
* `size` in the result will contain the estimated total segment byte size as if the data were stored in text format
|
||||
|
||||
#### interval
|
||||
|
||||
* If present, the SegmentMetadataQuery will return the list of intervals associated with the queried segments.
|
||||
* `intervals` in the result will contain the list of intervals associated with the queried segments.
|
||||
|
||||
#### aggregators
|
||||
|
||||
* `aggregators` in the result will contain the list of aggregators usable for querying metric columns. This may be
|
||||
null if the aggregators are unknown or unmergeable (if merging is enabled).
|
||||
|
||||
* Merging can be strict or lenient. See *lenientAggregatorMerge* below for details.
|
||||
|
||||
* The form of the result is a map of column name to aggregator.
|
||||
|
||||
### lenientAggregatorMerge
|
||||
|
||||
Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if
|
||||
two segments use incompatible aggregators for the same column (e.g. longSum changed to doubleSum).
|
||||
|
||||
Aggregators can be merged strictly (the default) or leniently. With strict merging, if there are any segments
|
||||
with unknown aggregators, or any conflicts of any kind, the merged aggregators list will be `null`. With lenient
|
||||
merging, segments with unknown aggregators will be ignored, and conflicts between aggregators will only null out
|
||||
the aggregator for that particular column.
|
||||
|
||||
In particular, with lenient merging, it is possible for an invidiual column's aggregator to be `null`. This will not
|
||||
occur with strict merging.
|
||||
|
|
|
@ -938,6 +938,7 @@ public class Druids
|
|||
private ColumnIncluderator toInclude;
|
||||
private EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes;
|
||||
private Boolean merge;
|
||||
private Boolean lenientAggregatorMerge;
|
||||
private Map<String, Object> context;
|
||||
|
||||
public SegmentMetadataQueryBuilder()
|
||||
|
@ -948,6 +949,7 @@ public class Druids
|
|||
analysisTypes = null;
|
||||
merge = null;
|
||||
context = null;
|
||||
lenientAggregatorMerge = null;
|
||||
}
|
||||
|
||||
public SegmentMetadataQuery build()
|
||||
|
@ -959,7 +961,8 @@ public class Druids
|
|||
merge,
|
||||
context,
|
||||
analysisTypes,
|
||||
false
|
||||
false,
|
||||
lenientAggregatorMerge
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -975,6 +978,7 @@ public class Druids
|
|||
.toInclude(toInclude)
|
||||
.analysisTypes(analysisTypesArray)
|
||||
.merge(merge)
|
||||
.lenientAggregatorMerge(lenientAggregatorMerge)
|
||||
.context(builder.context);
|
||||
}
|
||||
|
||||
|
@ -1032,6 +1036,12 @@ public class Druids
|
|||
return this;
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder lenientAggregatorMerge(boolean lenientAggregatorMerge)
|
||||
{
|
||||
this.lenientAggregatorMerge = lenientAggregatorMerge;
|
||||
return this;
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder context(Map<String, Object> c)
|
||||
{
|
||||
context = c;
|
||||
|
|
|
@ -20,9 +20,11 @@
|
|||
package io.druid.query.metadata;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -41,6 +43,8 @@ import io.druid.query.Query;
|
|||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.ResultMergeQueryRunner;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
|
||||
import io.druid.query.aggregation.MetricManipulationFn;
|
||||
import io.druid.query.metadata.metadata.ColumnAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentAnalysis;
|
||||
|
@ -51,7 +55,7 @@ import org.joda.time.Interval;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -67,13 +71,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
|
|||
@Override
|
||||
public SegmentAnalysis apply(SegmentAnalysis analysis)
|
||||
{
|
||||
return new SegmentAnalysis(
|
||||
analysis.getId(),
|
||||
analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null,
|
||||
analysis.getColumns(),
|
||||
analysis.getSize(),
|
||||
analysis.getNumRows()
|
||||
);
|
||||
return finalizeAnalysis(analysis);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -139,44 +137,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
|
|||
@Override
|
||||
public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
|
||||
{
|
||||
if (arg1 == null) {
|
||||
return arg2;
|
||||
}
|
||||
|
||||
if (arg2 == null) {
|
||||
return arg1;
|
||||
}
|
||||
|
||||
List<Interval> newIntervals = null;
|
||||
if (query.analyzingInterval()) {
|
||||
//List returned by arg1.getIntervals() is immutable, so a new list needs to
|
||||
//be created.
|
||||
newIntervals = new ArrayList<>(arg1.getIntervals());
|
||||
newIntervals.addAll(arg2.getIntervals());
|
||||
}
|
||||
|
||||
final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
|
||||
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
|
||||
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
|
||||
|
||||
Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
|
||||
for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
|
||||
final String columnName = entry.getKey();
|
||||
columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
|
||||
rightColumnNames.remove(columnName);
|
||||
}
|
||||
|
||||
for (String columnName : rightColumnNames) {
|
||||
columns.put(columnName, rightColumns.get(columnName));
|
||||
}
|
||||
|
||||
return new SegmentAnalysis(
|
||||
"merged",
|
||||
newIntervals,
|
||||
columns,
|
||||
arg1.getSize() + arg2.getSize(),
|
||||
arg1.getNumRows() + arg2.getNumRows()
|
||||
);
|
||||
return mergeAnalyses(arg1, arg2, query.isLenientAggregatorMerge());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -284,4 +245,110 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
|
|||
)
|
||||
);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static SegmentAnalysis mergeAnalyses(
|
||||
final SegmentAnalysis arg1,
|
||||
final SegmentAnalysis arg2,
|
||||
boolean lenientAggregatorMerge
|
||||
)
|
||||
{
|
||||
if (arg1 == null) {
|
||||
return arg2;
|
||||
}
|
||||
|
||||
if (arg2 == null) {
|
||||
return arg1;
|
||||
}
|
||||
|
||||
List<Interval> newIntervals = null;
|
||||
if (arg1.getIntervals() != null) {
|
||||
newIntervals = Lists.newArrayList();
|
||||
newIntervals.addAll(arg1.getIntervals());
|
||||
}
|
||||
if (arg2.getIntervals() != null) {
|
||||
if (newIntervals == null) {
|
||||
newIntervals = Lists.newArrayList();
|
||||
}
|
||||
newIntervals.addAll(arg2.getIntervals());
|
||||
}
|
||||
|
||||
final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
|
||||
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
|
||||
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
|
||||
|
||||
Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
|
||||
for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
|
||||
final String columnName = entry.getKey();
|
||||
columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
|
||||
rightColumnNames.remove(columnName);
|
||||
}
|
||||
|
||||
for (String columnName : rightColumnNames) {
|
||||
columns.put(columnName, rightColumns.get(columnName));
|
||||
}
|
||||
|
||||
final Map<String, AggregatorFactory> aggregators = Maps.newHashMap();
|
||||
|
||||
if (lenientAggregatorMerge) {
|
||||
// Merge each aggregator individually, ignoring nulls
|
||||
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
|
||||
if (analysis.getAggregators() != null) {
|
||||
for (AggregatorFactory aggregator : analysis.getAggregators().values()) {
|
||||
AggregatorFactory merged = aggregators.get(aggregator.getName());
|
||||
if (merged != null) {
|
||||
try {
|
||||
merged = merged.getMergingFactory(aggregator);
|
||||
}
|
||||
catch (AggregatorFactoryNotMergeableException e) {
|
||||
merged = null;
|
||||
}
|
||||
} else {
|
||||
merged = aggregator;
|
||||
}
|
||||
aggregators.put(aggregator.getName(), merged);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
final AggregatorFactory[] aggs1 = arg1.getAggregators() != null
|
||||
? arg1.getAggregators()
|
||||
.values()
|
||||
.toArray(new AggregatorFactory[arg1.getAggregators().size()])
|
||||
: null;
|
||||
final AggregatorFactory[] aggs2 = arg2.getAggregators() != null
|
||||
? arg2.getAggregators()
|
||||
.values()
|
||||
.toArray(new AggregatorFactory[arg2.getAggregators().size()])
|
||||
: null;
|
||||
final AggregatorFactory[] merged = AggregatorFactory.mergeAggregators(Arrays.asList(aggs1, aggs2));
|
||||
if (merged != null) {
|
||||
for (AggregatorFactory aggregator : merged) {
|
||||
aggregators.put(aggregator.getName(), aggregator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new SegmentAnalysis(
|
||||
"merged",
|
||||
newIntervals,
|
||||
columns,
|
||||
arg1.getSize() + arg2.getSize(),
|
||||
arg1.getNumRows() + arg2.getNumRows(),
|
||||
aggregators.isEmpty() ? null : aggregators
|
||||
);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis)
|
||||
{
|
||||
return new SegmentAnalysis(
|
||||
analysis.getId(),
|
||||
analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null,
|
||||
analysis.getColumns(),
|
||||
analysis.getSize(),
|
||||
analysis.getNumRows(),
|
||||
analysis.getAggregators()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,10 +39,12 @@ import io.druid.query.QueryRunner;
|
|||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.metadata.metadata.ColumnAnalysis;
|
||||
import io.druid.query.metadata.metadata.ColumnIncluderator;
|
||||
import io.druid.query.metadata.metadata.SegmentAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
import io.druid.segment.Metadata;
|
||||
import io.druid.segment.Segment;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -108,6 +110,21 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
|
|||
}
|
||||
List<Interval> retIntervals = query.analyzingInterval() ? Arrays.asList(segment.getDataInterval()) : null;
|
||||
|
||||
final Map<String, AggregatorFactory> aggregators;
|
||||
if (query.hasAggregators()) {
|
||||
final Metadata metadata = segment.asStorageAdapter().getMetadata();
|
||||
if (metadata != null && metadata.getAggregators() != null) {
|
||||
aggregators = Maps.newHashMap();
|
||||
for (AggregatorFactory aggregator : metadata.getAggregators()) {
|
||||
aggregators.put(aggregator.getName(), aggregator);
|
||||
}
|
||||
} else {
|
||||
aggregators = null;
|
||||
}
|
||||
} else {
|
||||
aggregators = null;
|
||||
}
|
||||
|
||||
return Sequences.simple(
|
||||
Arrays.asList(
|
||||
new SegmentAnalysis(
|
||||
|
@ -115,7 +132,8 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
|
|||
retIntervals,
|
||||
columns,
|
||||
totalSize,
|
||||
numRows
|
||||
numRows,
|
||||
aggregators
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -168,10 +186,10 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
|
|||
future.cancel(true);
|
||||
throw new QueryInterruptedException("Query interrupted");
|
||||
}
|
||||
catch(CancellationException e) {
|
||||
catch (CancellationException e) {
|
||||
throw new QueryInterruptedException("Query cancelled");
|
||||
}
|
||||
catch(TimeoutException e) {
|
||||
catch (TimeoutException e) {
|
||||
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
|
||||
future.cancel(true);
|
||||
throw new QueryInterruptedException("Query timeout");
|
||||
|
|
|
@ -21,10 +21,12 @@ package io.druid.query.metadata.metadata;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class SegmentAnalysis implements Comparable<SegmentAnalysis>
|
||||
{
|
||||
|
@ -33,6 +35,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
|
|||
private final Map<String, ColumnAnalysis> columns;
|
||||
private final long size;
|
||||
private final int numRows;
|
||||
private final Map<String, AggregatorFactory> aggregators;
|
||||
|
||||
@JsonCreator
|
||||
public SegmentAnalysis(
|
||||
|
@ -40,8 +43,8 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
|
|||
@JsonProperty("intervals") List<Interval> interval,
|
||||
@JsonProperty("columns") Map<String, ColumnAnalysis> columns,
|
||||
@JsonProperty("size") long size,
|
||||
@JsonProperty("numRows") int numRows
|
||||
|
||||
@JsonProperty("numRows") int numRows,
|
||||
@JsonProperty("aggregators") Map<String, AggregatorFactory> aggregators
|
||||
)
|
||||
{
|
||||
this.id = id;
|
||||
|
@ -49,6 +52,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
|
|||
this.columns = columns;
|
||||
this.size = size;
|
||||
this.numRows = numRows;
|
||||
this.aggregators = aggregators;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -81,15 +85,10 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
|
|||
return numRows;
|
||||
}
|
||||
|
||||
public String toDetailedString()
|
||||
@JsonProperty
|
||||
public Map<String, AggregatorFactory> getAggregators()
|
||||
{
|
||||
return "SegmentAnalysis{" +
|
||||
"id='" + id + '\'' +
|
||||
", interval=" + interval +
|
||||
", columns=" + columns +
|
||||
", size=" + size +
|
||||
", numRows=" + numRows +
|
||||
'}';
|
||||
return aggregators;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,9 +100,13 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
|
|||
", columns=" + columns +
|
||||
", size=" + size +
|
||||
", numRows=" + numRows +
|
||||
", aggregators=" + aggregators +
|
||||
'}';
|
||||
}
|
||||
|
||||
/**
|
||||
* Best-effort equals method; relies on AggregatorFactory.equals, which is not guaranteed to be sanely implemented.
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -113,36 +116,23 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
|
|||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SegmentAnalysis that = (SegmentAnalysis) o;
|
||||
|
||||
if (size != that.size) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (numRows != that.numRows) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (id != null ? !id.equals(that.id) : that.id != null) {
|
||||
return false;
|
||||
}
|
||||
if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
|
||||
return false;
|
||||
}
|
||||
return !(columns != null ? !columns.equals(that.columns) : that.columns != null);
|
||||
|
||||
return size == that.size &&
|
||||
numRows == that.numRows &&
|
||||
Objects.equals(id, that.id) &&
|
||||
Objects.equals(interval, that.interval) &&
|
||||
Objects.equals(columns, that.columns) &&
|
||||
Objects.equals(aggregators, that.aggregators);
|
||||
}
|
||||
|
||||
/**
|
||||
* Best-effort hashCode method; relies on AggregatorFactory.hashCode, which is not guaranteed to be sanely
|
||||
* implemented.
|
||||
*/
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = id != null ? id.hashCode() : 0;
|
||||
result = 31 * result + (interval != null ? interval.hashCode() : 0);
|
||||
result = 31 * result + (columns != null ? columns.hashCode() : 0);
|
||||
result = 31 * result + (int) (size ^ (size >>> 32));
|
||||
result = 31 * result + (int) (numRows ^ (numRows >>> 32));
|
||||
return result;
|
||||
return Objects.hash(id, interval, columns, size, numRows, aggregators);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.Arrays;
|
|||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
||||
{
|
||||
|
@ -51,7 +52,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
{
|
||||
CARDINALITY,
|
||||
SIZE,
|
||||
INTERVAL;
|
||||
INTERVAL,
|
||||
AGGREGATORS;
|
||||
|
||||
@JsonValue
|
||||
@Override
|
||||
|
@ -86,6 +88,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
private final boolean merge;
|
||||
private final boolean usingDefaultInterval;
|
||||
private final EnumSet<AnalysisType> analysisTypes;
|
||||
private final boolean lenientAggregatorMerge;
|
||||
|
||||
@JsonCreator
|
||||
public SegmentMetadataQuery(
|
||||
|
@ -95,7 +98,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
@JsonProperty("merge") Boolean merge,
|
||||
@JsonProperty("context") Map<String, Object> context,
|
||||
@JsonProperty("analysisTypes") EnumSet<AnalysisType> analysisTypes,
|
||||
@JsonProperty("usingDefaultInterval") Boolean useDefaultInterval
|
||||
@JsonProperty("usingDefaultInterval") Boolean useDefaultInterval,
|
||||
@JsonProperty("lenientAggregatorMerge") Boolean lenientAggregatorMerge
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -118,6 +122,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
dataSource instanceof TableDataSource,
|
||||
"SegmentMetadataQuery only supports table datasource"
|
||||
);
|
||||
this.lenientAggregatorMerge = lenientAggregatorMerge == null ? false : lenientAggregatorMerge;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -156,11 +161,22 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
return analysisTypes;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isLenientAggregatorMerge()
|
||||
{
|
||||
return lenientAggregatorMerge;
|
||||
}
|
||||
|
||||
public boolean analyzingInterval()
|
||||
{
|
||||
return analysisTypes.contains(AnalysisType.INTERVAL);
|
||||
}
|
||||
|
||||
public boolean hasAggregators()
|
||||
{
|
||||
return analysisTypes.contains(AnalysisType.AGGREGATORS);
|
||||
}
|
||||
|
||||
public byte[] getAnalysisTypesCacheKey()
|
||||
{
|
||||
int size = 1;
|
||||
|
@ -191,7 +207,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
merge,
|
||||
computeOverridenContext(contextOverride),
|
||||
analysisTypes,
|
||||
usingDefaultInterval
|
||||
usingDefaultInterval,
|
||||
lenientAggregatorMerge
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -205,7 +222,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
merge,
|
||||
getContext(),
|
||||
analysisTypes,
|
||||
usingDefaultInterval
|
||||
usingDefaultInterval,
|
||||
lenientAggregatorMerge
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -219,7 +237,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
merge,
|
||||
getContext(),
|
||||
analysisTypes,
|
||||
usingDefaultInterval
|
||||
usingDefaultInterval,
|
||||
lenientAggregatorMerge
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -233,6 +252,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
", merge=" + merge +
|
||||
", usingDefaultInterval=" + usingDefaultInterval +
|
||||
", analysisTypes=" + analysisTypes +
|
||||
", lenientAggregatorMerge=" + lenientAggregatorMerge +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
@ -248,31 +268,24 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SegmentMetadataQuery that = (SegmentMetadataQuery) o;
|
||||
|
||||
if (merge != that.merge) {
|
||||
return false;
|
||||
}
|
||||
if (usingDefaultInterval != that.usingDefaultInterval) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!analysisTypes.equals(that.analysisTypes)) {
|
||||
return false;
|
||||
}
|
||||
return !(toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null);
|
||||
|
||||
return merge == that.merge &&
|
||||
usingDefaultInterval == that.usingDefaultInterval &&
|
||||
lenientAggregatorMerge == that.lenientAggregatorMerge &&
|
||||
Objects.equals(toInclude, that.toInclude) &&
|
||||
Objects.equals(analysisTypes, that.analysisTypes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = super.hashCode();
|
||||
result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0);
|
||||
result = 31 * result + (merge ? 1 : 0);
|
||||
result = 31 * result + (usingDefaultInterval ? 1 : 0);
|
||||
result = 31 * result + analysisTypes.hashCode();
|
||||
return result;
|
||||
return Objects.hash(
|
||||
super.hashCode(),
|
||||
toInclude,
|
||||
merge,
|
||||
usingDefaultInterval,
|
||||
analysisTypes,
|
||||
lenientAggregatorMerge
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -899,4 +899,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metadata getMetadata()
|
||||
{
|
||||
return index.getMetadata();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,4 +55,5 @@ public interface StorageAdapter extends CursorFactory
|
|||
public String getColumnTypeName(String column);
|
||||
public int getNumRows();
|
||||
public DateTime getMaxIngestedEventTime();
|
||||
public Metadata getMetadata();
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import io.druid.segment.Cursor;
|
|||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.Metadata;
|
||||
import io.druid.segment.NullDimensionSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.SingleScanTimeDimSelector;
|
||||
|
@ -734,4 +735,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metadata getMetadata()
|
||||
{
|
||||
return index.getMetadata();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,7 +171,7 @@ public class SegmentAnalyzerTest
|
|||
);
|
||||
|
||||
final SegmentMetadataQuery query = new SegmentMetadataQuery(
|
||||
new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, analyses, false
|
||||
new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, analyses, false, false
|
||||
);
|
||||
HashMap<String, Object> context = new HashMap<String, Object>();
|
||||
return Sequences.toList(query.run(runner, context), Lists.<SegmentAnalysis>newArrayList());
|
||||
|
|
|
@ -23,9 +23,15 @@ package io.druid.query.metadata;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.CacheStrategy;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongMaxAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.metadata.metadata.ColumnAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
|
@ -35,19 +41,22 @@ import org.joda.time.Interval;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class SegmentMetadataQueryQueryToolChestTest
|
||||
{
|
||||
@Test
|
||||
public void testCacheStrategy() throws Exception
|
||||
{
|
||||
SegmentMetadataQuery query = new SegmentMetadataQuery(
|
||||
new TableDataSource("dummy"),
|
||||
QuerySegmentSpecs.create("2015-01-01/2015-01-02"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
new TableDataSource("dummy"),
|
||||
QuerySegmentSpecs.create("2015-01-01/2015-01-02"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false,
|
||||
false
|
||||
);
|
||||
|
||||
CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> strategy =
|
||||
|
@ -73,7 +82,8 @@ public class SegmentMetadataQueryQueryToolChestTest
|
|||
null
|
||||
)
|
||||
), 71982,
|
||||
100
|
||||
100,
|
||||
null
|
||||
);
|
||||
|
||||
Object preparedValue = strategy.prepareForCache().apply(result);
|
||||
|
@ -88,4 +98,162 @@ public class SegmentMetadataQueryQueryToolChestTest
|
|||
|
||||
Assert.assertEquals(result, fromCacheResult);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeAggregators()
|
||||
{
|
||||
final SegmentAnalysis analysis1 = new SegmentAnalysis(
|
||||
"id",
|
||||
null,
|
||||
Maps.<String, ColumnAnalysis>newHashMap(),
|
||||
0,
|
||||
0,
|
||||
ImmutableMap.of(
|
||||
"foo", new LongSumAggregatorFactory("foo", "foo"),
|
||||
"baz", new DoubleSumAggregatorFactory("baz", "baz")
|
||||
)
|
||||
);
|
||||
final SegmentAnalysis analysis2 = new SegmentAnalysis(
|
||||
"id",
|
||||
null,
|
||||
Maps.<String, ColumnAnalysis>newHashMap(),
|
||||
0,
|
||||
0,
|
||||
ImmutableMap.of(
|
||||
"foo", new LongSumAggregatorFactory("foo", "foo"),
|
||||
"bar", new DoubleSumAggregatorFactory("bar", "bar")
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableMap.of(
|
||||
"foo", new LongSumAggregatorFactory("foo", "foo"),
|
||||
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
|
||||
"baz", new DoubleSumAggregatorFactory("baz", "baz")
|
||||
),
|
||||
mergeStrict(analysis1, analysis2).getAggregators()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableMap.of(
|
||||
"foo", new LongSumAggregatorFactory("foo", "foo"),
|
||||
"bar", new DoubleSumAggregatorFactory("bar", "bar"),
|
||||
"baz", new DoubleSumAggregatorFactory("baz", "baz")
|
||||
),
|
||||
mergeLenient(analysis1, analysis2).getAggregators()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeAggregatorsOneNull()
|
||||
{
|
||||
final SegmentAnalysis analysis1 = new SegmentAnalysis(
|
||||
"id",
|
||||
null,
|
||||
Maps.<String, ColumnAnalysis>newHashMap(),
|
||||
0,
|
||||
0,
|
||||
null
|
||||
);
|
||||
final SegmentAnalysis analysis2 = new SegmentAnalysis(
|
||||
"id",
|
||||
null,
|
||||
Maps.<String, ColumnAnalysis>newHashMap(),
|
||||
0,
|
||||
0,
|
||||
ImmutableMap.of(
|
||||
"foo", new LongSumAggregatorFactory("foo", "foo"),
|
||||
"bar", new DoubleSumAggregatorFactory("bar", "bar")
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators());
|
||||
Assert.assertEquals(
|
||||
ImmutableMap.of(
|
||||
"foo", new LongSumAggregatorFactory("foo", "foo"),
|
||||
"bar", new DoubleSumAggregatorFactory("bar", "bar")
|
||||
),
|
||||
mergeLenient(analysis1, analysis2).getAggregators()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeAggregatorsAllNull()
|
||||
{
|
||||
final SegmentAnalysis analysis1 = new SegmentAnalysis(
|
||||
"id",
|
||||
null,
|
||||
Maps.<String, ColumnAnalysis>newHashMap(),
|
||||
0,
|
||||
0,
|
||||
null
|
||||
);
|
||||
final SegmentAnalysis analysis2 = new SegmentAnalysis(
|
||||
"id",
|
||||
null,
|
||||
Maps.<String, ColumnAnalysis>newHashMap(),
|
||||
0,
|
||||
0,
|
||||
null
|
||||
);
|
||||
|
||||
Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators());
|
||||
Assert.assertNull(mergeLenient(analysis1, analysis2).getAggregators());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeAggregatorsConflict()
|
||||
{
|
||||
final SegmentAnalysis analysis1 = new SegmentAnalysis(
|
||||
"id",
|
||||
null,
|
||||
Maps.<String, ColumnAnalysis>newHashMap(),
|
||||
0,
|
||||
0,
|
||||
ImmutableMap.of(
|
||||
"foo", new LongSumAggregatorFactory("foo", "foo"),
|
||||
"bar", new DoubleSumAggregatorFactory("bar", "bar")
|
||||
)
|
||||
);
|
||||
final SegmentAnalysis analysis2 = new SegmentAnalysis(
|
||||
"id",
|
||||
null,
|
||||
Maps.<String, ColumnAnalysis>newHashMap(),
|
||||
0,
|
||||
0,
|
||||
ImmutableMap.of(
|
||||
"foo", new LongSumAggregatorFactory("foo", "foo"),
|
||||
"bar", new DoubleMaxAggregatorFactory("bar", "bar"),
|
||||
"baz", new LongMaxAggregatorFactory("baz", "baz")
|
||||
)
|
||||
);
|
||||
|
||||
final Map<String, AggregatorFactory> expectedLenient = Maps.newHashMap();
|
||||
expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo"));
|
||||
expectedLenient.put("bar", null);
|
||||
expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz"));
|
||||
Assert.assertNull(mergeStrict(analysis1, analysis2).getAggregators());
|
||||
Assert.assertEquals(expectedLenient, mergeLenient(analysis1, analysis2).getAggregators());
|
||||
}
|
||||
|
||||
private static SegmentAnalysis mergeStrict(SegmentAnalysis analysis1, SegmentAnalysis analysis2)
|
||||
{
|
||||
return SegmentMetadataQueryQueryToolChest.finalizeAnalysis(
|
||||
SegmentMetadataQueryQueryToolChest.mergeAnalyses(
|
||||
analysis1,
|
||||
analysis2,
|
||||
false
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private static SegmentAnalysis mergeLenient(SegmentAnalysis analysis1, SegmentAnalysis analysis2)
|
||||
{
|
||||
return SegmentMetadataQueryQueryToolChest.finalizeAnalysis(
|
||||
SegmentMetadataQueryQueryToolChest.mergeAnalyses(
|
||||
analysis1,
|
||||
analysis2,
|
||||
true
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import io.druid.query.QueryRunnerFactory;
|
|||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.metadata.metadata.ColumnAnalysis;
|
||||
import io.druid.query.metadata.metadata.ListColumnIncluderator;
|
||||
import io.druid.query.metadata.metadata.SegmentAnalysis;
|
||||
|
@ -59,6 +60,7 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
|
@ -150,7 +152,8 @@ public class SegmentMetadataQueryTest
|
|||
null
|
||||
)
|
||||
), usingMmappedSegment ? 71982 : 32643,
|
||||
1209
|
||||
1209,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -191,7 +194,8 @@ public class SegmentMetadataQueryTest
|
|||
)
|
||||
),
|
||||
0,
|
||||
expectedSegmentAnalysis.getNumRows() * 2
|
||||
expectedSegmentAnalysis.getNumRows() * 2,
|
||||
null
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
@ -250,7 +254,8 @@ public class SegmentMetadataQueryTest
|
|||
)
|
||||
),
|
||||
0,
|
||||
expectedSegmentAnalysis.getNumRows() * 2
|
||||
expectedSegmentAnalysis.getNumRows() * 2,
|
||||
null
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
@ -317,7 +322,8 @@ public class SegmentMetadataQueryTest
|
|||
)
|
||||
),
|
||||
expectedSegmentAnalysis.getSize() * 2,
|
||||
expectedSegmentAnalysis.getNumRows() * 2
|
||||
expectedSegmentAnalysis.getNumRows() * 2,
|
||||
null
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
@ -362,7 +368,8 @@ public class SegmentMetadataQueryTest
|
|||
)
|
||||
),
|
||||
0,
|
||||
expectedSegmentAnalysis.getNumRows() * 2
|
||||
expectedSegmentAnalysis.getNumRows() * 2,
|
||||
null
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
@ -396,6 +403,62 @@ public class SegmentMetadataQueryTest
|
|||
exec.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSegmentMetadataQueryWithAggregatorsMerge()
|
||||
{
|
||||
final Map<String, AggregatorFactory> expectedAggregators = Maps.newHashMap();
|
||||
for (AggregatorFactory agg : TestIndex.METRIC_AGGS) {
|
||||
expectedAggregators.put(agg.getName(), agg.getCombiningFactory());
|
||||
}
|
||||
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
|
||||
"merged",
|
||||
null,
|
||||
ImmutableMap.of(
|
||||
"placement",
|
||||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
false,
|
||||
0,
|
||||
0,
|
||||
null
|
||||
)
|
||||
),
|
||||
0,
|
||||
expectedSegmentAnalysis.getNumRows() * 2,
|
||||
expectedAggregators
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
||||
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
|
||||
ExecutorService exec = Executors.newCachedThreadPool();
|
||||
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
|
||||
toolChest.mergeResults(
|
||||
FACTORY.mergeRunners(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
|
||||
)
|
||||
),
|
||||
toolChest
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(
|
||||
ImmutableList.of(mergedSegmentAnalysis),
|
||||
myRunner.run(
|
||||
Druids.newSegmentMetadataQueryBuilder()
|
||||
.dataSource("testing")
|
||||
.intervals("2013/2014")
|
||||
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
|
||||
.analysisTypes(SegmentMetadataQuery.AnalysisType.AGGREGATORS)
|
||||
.merge(true)
|
||||
.build(),
|
||||
Maps.newHashMap()
|
||||
),
|
||||
"failed SegmentMetadata merging query"
|
||||
);
|
||||
exec.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBySegmentResults()
|
||||
{
|
||||
|
|
|
@ -74,7 +74,7 @@ public class TestIndex
|
|||
public static final String[] METRICS = new String[]{"index"};
|
||||
private static final Logger log = new Logger(TestIndex.class);
|
||||
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
|
||||
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
|
||||
public static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
|
||||
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue