mirror of https://github.com/apache/druid.git
1) Take in g9yuayon's pull request, fix merge conflicts and formatting
This commit is contained in:
parent
7d34710edf
commit
8bd19e9d04
|
@ -22,11 +22,14 @@ package com.metamx.druid.query.group;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.druid.BaseQuery;
|
||||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.QueryGranularity;
|
||||
|
@ -37,13 +40,15 @@ import com.metamx.druid.query.Queries;
|
|||
import com.metamx.druid.query.dimension.DefaultDimensionSpec;
|
||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
import com.metamx.druid.query.filter.DimFilter;
|
||||
import com.metamx.druid.query.group.limit.DefaultLimitSpec;
|
||||
import com.metamx.druid.query.group.limit.LimitSpec;
|
||||
import com.metamx.druid.query.group.limit.NoopLimitSpec;
|
||||
import com.metamx.druid.query.group.limit.OrderByColumnSpec;
|
||||
import com.metamx.druid.query.group.having.HavingSpec;
|
||||
import com.metamx.druid.query.group.orderby.DefaultLimitSpec;
|
||||
import com.metamx.druid.query.group.orderby.LimitSpec;
|
||||
import com.metamx.druid.query.group.orderby.NoopLimitSpec;
|
||||
import com.metamx.druid.query.group.orderby.OrderByColumnSpec;
|
||||
import com.metamx.druid.query.segment.LegacySegmentSpec;
|
||||
import com.metamx.druid.query.segment.QuerySegmentSpec;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -57,6 +62,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
}
|
||||
|
||||
private final LimitSpec limitSpec;
|
||||
private final HavingSpec havingSpec;
|
||||
private final DimFilter dimFilter;
|
||||
private final QueryGranularity granularity;
|
||||
private final List<DimensionSpec> dimensions;
|
||||
|
@ -69,34 +75,88 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
public GroupByQuery(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
||||
@JsonProperty("limitSpec") LimitSpec limitSpec,
|
||||
@JsonProperty("filter") DimFilter dimFilter,
|
||||
@JsonProperty("granularity") QueryGranularity granularity,
|
||||
@JsonProperty("dimensions") List<DimensionSpec> dimensions,
|
||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
|
||||
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
|
||||
@JsonProperty("having") HavingSpec havingSpec,
|
||||
@JsonProperty("limitSpec") LimitSpec limitSpec,
|
||||
@JsonProperty("orderBy") LimitSpec orderBySpec,
|
||||
@JsonProperty("context") Map<String, String> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec;
|
||||
this.dimFilter = dimFilter;
|
||||
this.granularity = granularity;
|
||||
this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions;
|
||||
this.aggregatorSpecs = aggregatorSpecs;
|
||||
this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs;
|
||||
this.havingSpec = havingSpec;
|
||||
this.limitSpec = (limitSpec == null) ? (orderBySpec == null ? new NoopLimitSpec() : orderBySpec) : limitSpec;
|
||||
|
||||
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
|
||||
Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator");
|
||||
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
|
||||
|
||||
orderByLimitFn = this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
|
||||
Function<Sequence<Row>, Sequence<Row>> postProcFn =
|
||||
this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
|
||||
|
||||
if (havingSpec != null) {
|
||||
postProcFn = Functions.compose(
|
||||
new Function<Sequence<Row>, Sequence<Row>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Row> apply(@Nullable Sequence<Row> input)
|
||||
{
|
||||
return Sequences.filter(
|
||||
input,
|
||||
new Predicate<Row>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable Row input)
|
||||
{
|
||||
return GroupByQuery.this.havingSpec.eval(input);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
postProcFn
|
||||
);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public LimitSpec getLimitSpec()
|
||||
orderByLimitFn = postProcFn;
|
||||
}
|
||||
|
||||
/**
|
||||
* A private constructor that avoids all of the various state checks. Used by the with*() methods where the checks
|
||||
* have already passed in order for the object to exist.
|
||||
*/
|
||||
private GroupByQuery(
|
||||
String dataSource,
|
||||
QuerySegmentSpec querySegmentSpec,
|
||||
DimFilter dimFilter,
|
||||
QueryGranularity granularity,
|
||||
List<DimensionSpec> dimensions,
|
||||
List<AggregatorFactory> aggregatorSpecs,
|
||||
List<PostAggregator> postAggregatorSpecs,
|
||||
HavingSpec havingSpec,
|
||||
LimitSpec orderBySpec,
|
||||
Function<Sequence<Row>, Sequence<Row>> orderByLimitFn,
|
||||
Map<String, String> context
|
||||
)
|
||||
{
|
||||
return limitSpec;
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
|
||||
this.dimFilter = dimFilter;
|
||||
this.granularity = granularity;
|
||||
this.dimensions = dimensions;
|
||||
this.aggregatorSpecs = aggregatorSpecs;
|
||||
this.postAggregatorSpecs = postAggregatorSpecs;
|
||||
this.havingSpec = havingSpec;
|
||||
this.limitSpec = orderBySpec;
|
||||
this.orderByLimitFn = orderByLimitFn;
|
||||
}
|
||||
|
||||
@JsonProperty("filter")
|
||||
|
@ -129,6 +189,18 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return postAggregatorSpecs;
|
||||
}
|
||||
|
||||
@JsonProperty("having")
|
||||
public HavingSpec getHavingSpec()
|
||||
{
|
||||
return havingSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public LimitSpec getOrderBy()
|
||||
{
|
||||
return limitSpec;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasFilters()
|
||||
{
|
||||
|
@ -152,12 +224,14 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return new GroupByQuery(
|
||||
getDataSource(),
|
||||
getQuerySegmentSpec(),
|
||||
limitSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
aggregatorSpecs,
|
||||
postAggregatorSpecs,
|
||||
havingSpec,
|
||||
limitSpec,
|
||||
orderByLimitFn,
|
||||
computeOverridenContext(contextOverride)
|
||||
);
|
||||
}
|
||||
|
@ -168,12 +242,14 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return new GroupByQuery(
|
||||
getDataSource(),
|
||||
spec,
|
||||
limitSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
aggregatorSpecs,
|
||||
postAggregatorSpecs,
|
||||
havingSpec,
|
||||
limitSpec,
|
||||
orderByLimitFn,
|
||||
getContext()
|
||||
);
|
||||
}
|
||||
|
@ -187,6 +263,8 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
private List<DimensionSpec> dimensions;
|
||||
private List<AggregatorFactory> aggregatorSpecs;
|
||||
private List<PostAggregator> postAggregatorSpecs;
|
||||
private HavingSpec havingSpec;
|
||||
|
||||
private Map<String, String> context;
|
||||
|
||||
private LimitSpec limitSpec = null;
|
||||
|
@ -205,6 +283,9 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
dimensions = builder.dimensions;
|
||||
aggregatorSpecs = builder.aggregatorSpecs;
|
||||
postAggregatorSpecs = builder.postAggregatorSpecs;
|
||||
havingSpec = builder.havingSpec;
|
||||
limit = builder.limit;
|
||||
|
||||
context = builder.context;
|
||||
}
|
||||
|
||||
|
@ -351,6 +432,20 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setHavingSpec(HavingSpec havingSpec)
|
||||
{
|
||||
this.havingSpec = havingSpec;
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setLimit(Integer limit)
|
||||
{
|
||||
this.limit = limit;
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder copy()
|
||||
{
|
||||
return new Builder(this);
|
||||
|
@ -361,20 +456,21 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
final LimitSpec theLimitSpec;
|
||||
if (limitSpec == null) {
|
||||
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
theLimitSpec = limitSpec;
|
||||
}
|
||||
|
||||
return new GroupByQuery(
|
||||
dataSource,
|
||||
querySegmentSpec,
|
||||
theLimitSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
aggregatorSpecs,
|
||||
postAggregatorSpecs,
|
||||
havingSpec,
|
||||
null,
|
||||
theLimitSpec,
|
||||
context
|
||||
);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
package com.metamx.druid.query.group.having;
|
||||
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class AlwaysHavingSpec implements HavingSpec
|
||||
{
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
package com.metamx.druid.query.group.having;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The logical "and" operator for the "having" clause.
|
||||
*/
|
||||
public class AndHavingSpec implements HavingSpec
|
||||
{
|
||||
private List<HavingSpec> havingSpecs;
|
||||
|
||||
@JsonCreator
|
||||
public AndHavingSpec(@JsonProperty("havingSpecs") List<HavingSpec> havingSpecs)
|
||||
{
|
||||
this.havingSpecs = havingSpecs == null ? ImmutableList.<HavingSpec>of() : havingSpecs;
|
||||
}
|
||||
|
||||
@JsonProperty("havingSpecs")
|
||||
public List<HavingSpec> getHavingSpecs()
|
||||
{
|
||||
return havingSpecs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
for (HavingSpec havingSpec : havingSpecs) {
|
||||
if (!havingSpec.eval(row)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
AndHavingSpec that = (AndHavingSpec) o;
|
||||
|
||||
if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return havingSpecs != null ? havingSpecs.hashCode() : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append("AndHavingSpec");
|
||||
sb.append("{havingSpecs=").append(havingSpecs);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
package com.metamx.druid.query.group.having;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
/**
|
||||
* The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value",
|
||||
* except that in SQL an aggregation is an expression instead of an aggregation name as in Druid.
|
||||
*/
|
||||
public class EqualToHavingSpec implements HavingSpec
|
||||
{
|
||||
private String aggregationName;
|
||||
private Number value;
|
||||
|
||||
@JsonCreator
|
||||
public EqualToHavingSpec(
|
||||
@JsonProperty("aggregation") String aggName,
|
||||
@JsonProperty("value") Number value
|
||||
)
|
||||
{
|
||||
this.aggregationName = aggName;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@JsonProperty("value")
|
||||
public Number getValue()
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
@JsonProperty("aggregation")
|
||||
public String getAggregationName()
|
||||
{
|
||||
return aggregationName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
float metricValue = row.getFloatMetric(aggregationName);
|
||||
|
||||
return Float.compare(value.floatValue(), metricValue) == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method treats internal value as double mainly for ease of test.
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
EqualToHavingSpec that = (EqualToHavingSpec) o;
|
||||
|
||||
if (aggregationName != null ? !aggregationName.equals(that.aggregationName) : that.aggregationName != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (value != null && that.value != null) {
|
||||
return Double.compare(value.doubleValue(), that.value.doubleValue()) == 0;
|
||||
}
|
||||
|
||||
if (value == null && that.value == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = aggregationName != null ? aggregationName.hashCode() : 0;
|
||||
result = 31 * result + (value != null ? value.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append("EqualToHavingSpec");
|
||||
sb.append("{aggregationName='").append(aggregationName).append('\'');
|
||||
sb.append(", value=").append(value);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
package com.metamx.druid.query.group.having;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
/**
|
||||
* The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value",
|
||||
* except that an aggregation in SQL is an expression instead of an aggregation name as in Druid.
|
||||
*/
|
||||
public class GreaterThanHavingSpec implements HavingSpec
|
||||
{
|
||||
private String aggregationName;
|
||||
private Number value;
|
||||
|
||||
@JsonCreator
|
||||
public GreaterThanHavingSpec(
|
||||
@JsonProperty("aggregation") String aggName,
|
||||
@JsonProperty("value") Number value
|
||||
)
|
||||
{
|
||||
this.aggregationName = aggName;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@JsonProperty("aggregation")
|
||||
public String getAggregationName()
|
||||
{
|
||||
return aggregationName;
|
||||
}
|
||||
|
||||
@JsonProperty("value")
|
||||
public Number getValue()
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
float metricValue = row.getFloatMetric(aggregationName);
|
||||
|
||||
return Float.compare(metricValue, value.floatValue()) > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method treats internal value as double mainly for ease of test.
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
GreaterThanHavingSpec that = (GreaterThanHavingSpec) o;
|
||||
|
||||
if (value != null && that.value != null) {
|
||||
return Double.compare(value.doubleValue(), that.value.doubleValue()) == 0;
|
||||
}
|
||||
|
||||
if (value == null && that.value == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = aggregationName != null ? aggregationName.hashCode() : 0;
|
||||
result = 31 * result + (value != null ? value.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append("GreaterThanHavingSpec");
|
||||
sb.append("{aggregationName='").append(aggregationName).append('\'');
|
||||
sb.append(", value=").append(value);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
package com.metamx.druid.query.group.having;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
/**
|
||||
* A "having" clause that filters aggregated value. This is similar to SQL's "having"
|
||||
* clause.
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = AlwaysHavingSpec.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "and", value = AndHavingSpec.class),
|
||||
@JsonSubTypes.Type(name = "or", value = OrHavingSpec.class),
|
||||
@JsonSubTypes.Type(name = "not", value = NotHavingSpec.class),
|
||||
@JsonSubTypes.Type(name = "greaterThan", value = GreaterThanHavingSpec.class),
|
||||
@JsonSubTypes.Type(name = "lessThan", value = LessThanHavingSpec.class),
|
||||
@JsonSubTypes.Type(name = "equalTo", value = EqualToHavingSpec.class)
|
||||
})
|
||||
public interface HavingSpec
|
||||
{
|
||||
/**
|
||||
* Evaluates if a given row satisfies the having spec.
|
||||
*
|
||||
* @param row A Row of data that may contain aggregated values
|
||||
*
|
||||
* @return true if the given row satisfies the having spec. False otherwise.
|
||||
*
|
||||
* @see Row
|
||||
*/
|
||||
public boolean eval(Row row);
|
||||
|
||||
// Atoms for easy combination, but for now they are mostly useful
|
||||
// for testing.
|
||||
/**
|
||||
* A "having" spec that always evaluates to false
|
||||
*/
|
||||
public static final HavingSpec NEVER = new HavingSpec()
|
||||
{
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* A "having" spec that always evaluates to true
|
||||
*/
|
||||
public static final HavingSpec ALWAYS = new AlwaysHavingSpec();
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
package com.metamx.druid.query.group.having;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
/**
|
||||
* The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value",
|
||||
* except that an aggregation in SQL is an expression instead of an aggregation name as in Druid.
|
||||
*/
|
||||
public class LessThanHavingSpec implements HavingSpec
|
||||
{
|
||||
private String aggregationName;
|
||||
private Number value;
|
||||
|
||||
public LessThanHavingSpec
|
||||
(
|
||||
@JsonProperty("aggregation") String aggName,
|
||||
@JsonProperty("value") Number value
|
||||
)
|
||||
{
|
||||
this.aggregationName = aggName;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@JsonProperty("aggregation")
|
||||
public String getAggregationName()
|
||||
{
|
||||
return aggregationName;
|
||||
}
|
||||
|
||||
@JsonProperty("value")
|
||||
public Number getValue()
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
float metricValue = row.getFloatMetric(aggregationName);
|
||||
|
||||
return Float.compare(metricValue, value.floatValue()) < 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method treats internal value as double mainly for ease of test.
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
LessThanHavingSpec that = (LessThanHavingSpec) o;
|
||||
|
||||
if (value != null && that.value != null) {
|
||||
return Double.compare(value.doubleValue(), that.value.doubleValue()) == 0;
|
||||
}
|
||||
|
||||
if (value == null && that.value == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = aggregationName != null ? aggregationName.hashCode() : 0;
|
||||
result = 31 * result + (value != null ? value.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append("LessThanHavingSpec");
|
||||
sb.append("{aggregationName='").append(aggregationName).append('\'');
|
||||
sb.append(", value=").append(value);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package com.metamx.druid.query.group.having;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
/**
|
||||
* The logical "not" operator for the "having" clause.
|
||||
*/
|
||||
public class NotHavingSpec implements HavingSpec
|
||||
{
|
||||
private HavingSpec havingSpec;
|
||||
|
||||
@JsonCreator
|
||||
public NotHavingSpec(@JsonProperty("havingSpec") HavingSpec havingSpec)
|
||||
{
|
||||
this.havingSpec = havingSpec;
|
||||
}
|
||||
|
||||
@JsonProperty("havingSpec")
|
||||
public HavingSpec getHavingSpec()
|
||||
{
|
||||
return havingSpec;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
return !havingSpec.eval(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append("NotHavingSpec");
|
||||
sb.append("{havingSpec=").append(havingSpec);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
NotHavingSpec that = (NotHavingSpec) o;
|
||||
|
||||
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return havingSpec != null ? havingSpec.hashCode() : 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
package com.metamx.druid.query.group.having;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The logical "or" operator for the "having" clause.
|
||||
*/
|
||||
public class OrHavingSpec implements HavingSpec
|
||||
{
|
||||
private List<HavingSpec> havingSpecs;
|
||||
|
||||
@JsonCreator
|
||||
public OrHavingSpec(@JsonProperty("havingSpecs") List<HavingSpec> havingSpecs) {
|
||||
this.havingSpecs = havingSpecs == null ? ImmutableList.<HavingSpec>of() : havingSpecs;
|
||||
}
|
||||
|
||||
@JsonProperty("havingSpecs")
|
||||
public List<HavingSpec> getHavingSpecs(){
|
||||
return havingSpecs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
for(HavingSpec havingSpec: havingSpecs) {
|
||||
if(havingSpec.eval(row)){
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
OrHavingSpec that = (OrHavingSpec) o;
|
||||
|
||||
if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return havingSpecs != null ? havingSpecs.hashCode() : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append("OrHavingSpec");
|
||||
sb.append("{havingSpecs=").append(havingSpecs);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -17,13 +17,14 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.group.limit;
|
||||
package com.metamx.druid.query.group.orderby;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.primitives.Longs;
|
||||
|
@ -35,6 +36,8 @@ import com.metamx.druid.aggregation.post.PostAggregator;
|
|||
import com.metamx.druid.input.Row;
|
||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -55,7 +58,7 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
this.columns = (columns == null) ? ImmutableList.<OrderByColumnSpec>of() : columns;
|
||||
this.limit = (limit == null) ? Integer.MAX_VALUE : limit;
|
||||
|
||||
Preconditions.checkState(limit > 0, "limit[%s] must be >0", limit);
|
||||
Preconditions.checkArgument(this.limit > 0, "limit[%s] must be >0", limit);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -75,20 +78,22 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
|
||||
)
|
||||
{
|
||||
if (columns.isEmpty()) {
|
||||
return new LimitingFn(limit);
|
||||
}
|
||||
|
||||
// Materialize the Comparator first for fast-fail error checking.
|
||||
final Comparator<Row> comparator = makeComparator(dimensions, aggs, postAggs);
|
||||
final Ordering<Row> ordering = makeComparator(dimensions, aggs, postAggs);
|
||||
|
||||
return new Function<Sequence<Row>, Sequence<Row>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Row> apply(Sequence<Row> input)
|
||||
{
|
||||
return Sequences.limit(Sequences.sort(input, comparator), limit);
|
||||
if (limit == Integer.MAX_VALUE) {
|
||||
return new SortingFn(ordering);
|
||||
}
|
||||
else {
|
||||
return new TopNFunction(ordering, limit);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Comparator<Row> makeComparator(
|
||||
private Ordering<Row> makeComparator(
|
||||
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
|
||||
)
|
||||
{
|
||||
|
@ -174,4 +179,57 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
", limit=" + limit +
|
||||
'}';
|
||||
}
|
||||
|
||||
private static class LimitingFn implements Function<Sequence<Row>, Sequence<Row>>
|
||||
{
|
||||
private int limit;
|
||||
|
||||
public LimitingFn(int limit)
|
||||
{
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Row> apply(
|
||||
@Nullable Sequence<Row> input
|
||||
)
|
||||
{
|
||||
return Sequences.limit(input, limit);
|
||||
}
|
||||
}
|
||||
|
||||
private static class SortingFn implements Function<Sequence<Row>, Sequence<Row>>
|
||||
{
|
||||
private final Ordering<Row> ordering;
|
||||
|
||||
public SortingFn(Ordering<Row> ordering) {this.ordering = ordering;}
|
||||
|
||||
@Override
|
||||
public Sequence<Row> apply(@Nullable Sequence<Row> input)
|
||||
{
|
||||
return Sequences.sort(input, ordering);
|
||||
}
|
||||
}
|
||||
|
||||
private static class TopNFunction implements Function<Sequence<Row>, Sequence<Row>>
|
||||
{
|
||||
private final TopNSorter<Row> sorter;
|
||||
private final int limit;
|
||||
|
||||
public TopNFunction(Ordering<Row> ordering, int limit)
|
||||
{
|
||||
this.limit = limit;
|
||||
|
||||
this.sorter = new TopNSorter<Row>(ordering);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Row> apply(
|
||||
@Nullable Sequence<Row> input
|
||||
)
|
||||
{
|
||||
final ArrayList<Row> materializedList = Sequences.toList(input, Lists.<Row>newArrayList());
|
||||
return Sequences.simple(sorter.toTopN(materializedList, limit));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.group.limit;
|
||||
package com.metamx.druid.query.group.orderby;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
|
@ -1,4 +1,4 @@
|
|||
package com.metamx.druid.query.group.limit;
|
||||
package com.metamx.druid.query.group.orderby;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
|
@ -1,12 +1,17 @@
|
|||
package com.metamx.druid.query.group.limit;
|
||||
package com.metamx.druid.query.group.orderby;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -52,6 +57,46 @@ public class OrderByColumnSpec
|
|||
}
|
||||
}
|
||||
|
||||
public static OrderByColumnSpec asc(String dimension)
|
||||
{
|
||||
return new OrderByColumnSpec(dimension, Direction.ASCENDING);
|
||||
}
|
||||
|
||||
public static List<OrderByColumnSpec> ascending(String... dimension)
|
||||
{
|
||||
return Lists.transform(
|
||||
Arrays.asList(dimension),
|
||||
new Function<String, OrderByColumnSpec>()
|
||||
{
|
||||
@Override
|
||||
public OrderByColumnSpec apply(@Nullable String input)
|
||||
{
|
||||
return asc(input);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public static OrderByColumnSpec desc(String dimension)
|
||||
{
|
||||
return new OrderByColumnSpec(dimension, Direction.DESCENDING);
|
||||
}
|
||||
|
||||
public static List<OrderByColumnSpec> descending(String... dimension)
|
||||
{
|
||||
return Lists.transform(
|
||||
Arrays.asList(dimension),
|
||||
new Function<String, OrderByColumnSpec>()
|
||||
{
|
||||
@Override
|
||||
public OrderByColumnSpec apply(@Nullable String input)
|
||||
{
|
||||
return desc(input);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public OrderByColumnSpec(
|
||||
String dimension,
|
||||
Direction direction
|
|
@ -0,0 +1,43 @@
|
|||
package com.metamx.druid.query.group.orderby;
|
||||
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* Utility class that supports iterating a priority queue in sorted order.
|
||||
*/
|
||||
class OrderedPriorityQueueItems<T> implements Iterable<T>
|
||||
{
|
||||
private MinMaxPriorityQueue<T> rows;
|
||||
|
||||
public OrderedPriorityQueueItems(MinMaxPriorityQueue<T> rows)
|
||||
{
|
||||
this.rows = rows;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<T> iterator()
|
||||
{
|
||||
return new Iterator<T>() {
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return !rows.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T next()
|
||||
{
|
||||
return rows.poll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove()
|
||||
{
|
||||
throw new UnsupportedOperationException("Can't remove any item from an intermediary heap for orderBy/limit");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package com.metamx.druid.query.group.orderby;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.google.common.collect.Ordering;
|
||||
|
||||
/**
|
||||
* A utility class that sorts a list of comparable items in the given order, and keeps only the
|
||||
* top N sorted items.
|
||||
*/
|
||||
public class TopNSorter<T>
|
||||
{
|
||||
private Ordering<T> ordering;
|
||||
|
||||
/**
|
||||
* Constructs a sorter that will sort items with given ordering.
|
||||
* @param ordering the order that this sorter instance will use for sorting
|
||||
*/
|
||||
public TopNSorter(Ordering<T> ordering)
|
||||
{
|
||||
this.ordering = ordering;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sorts a list of rows and retain the top n items
|
||||
* @param items the collections of items to be sorted
|
||||
* @param n the number of items to be retained
|
||||
* @return Top n items that are sorted in the order specified when this instance is constructed.
|
||||
*/
|
||||
public Iterable<T> toTopN(Iterable<T> items, int n)
|
||||
{
|
||||
if(n <= 0) {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
|
||||
MinMaxPriorityQueue<T> queue = MinMaxPriorityQueue.orderedBy(ordering).maximumSize(n).create(items);
|
||||
|
||||
return new OrderedPriorityQueueItems<T>(queue);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,227 @@
|
|||
package com.metamx.druid.query.group.having;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.druid.input.MapBasedInputRow;
|
||||
import com.metamx.druid.input.Row;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
||||
public class HavingSpecTest
|
||||
{
|
||||
private static final Row ROW = new MapBasedInputRow(0, new ArrayList<String>(), ImmutableMap.of("metric", (Object)Float.valueOf(10)));
|
||||
|
||||
@Test
|
||||
public void testHavingClauseSerde() throws Exception {
|
||||
List<HavingSpec> havings = Arrays.asList(
|
||||
new GreaterThanHavingSpec("agg", Double.valueOf(1.3)),
|
||||
new OrHavingSpec(
|
||||
Arrays.asList(
|
||||
new LessThanHavingSpec("lessAgg", Long.valueOf(1L)),
|
||||
new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2)))
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
HavingSpec andHavingSpec = new AndHavingSpec(havings);
|
||||
|
||||
Map<String, Object> notMap = ImmutableMap.<String, Object>of(
|
||||
"type", "not",
|
||||
"havingSpec", ImmutableMap.of("type", "equalTo", "aggregation", "equalAgg", "value", 2.0)
|
||||
);
|
||||
|
||||
Map<String, Object> lessMap = ImmutableMap.<String, Object>of(
|
||||
"type", "lessThan",
|
||||
"aggregation", "lessAgg",
|
||||
"value", 1
|
||||
);
|
||||
|
||||
Map<String, Object> greaterMap = ImmutableMap.<String, Object>of(
|
||||
"type", "greaterThan",
|
||||
"aggregation", "agg",
|
||||
"value", 1.3
|
||||
);
|
||||
|
||||
Map<String, Object> orMap = ImmutableMap.<String, Object>of(
|
||||
"type", "or",
|
||||
"havingSpecs", ImmutableList.of(lessMap, notMap)
|
||||
);
|
||||
|
||||
Map<String, Object> payloadMap = ImmutableMap.<String, Object>of(
|
||||
"type", "and",
|
||||
"havingSpecs", ImmutableList.of(greaterMap, orMap)
|
||||
);
|
||||
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
assertEquals(andHavingSpec, mapper.convertValue(payloadMap, AndHavingSpec.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGreaterThanHavingSpec() {
|
||||
GreaterThanHavingSpec spec = new GreaterThanHavingSpec("metric", 10.003);
|
||||
assertFalse(spec.eval(ROW));
|
||||
|
||||
spec = new GreaterThanHavingSpec("metric", 10);
|
||||
assertFalse(spec.eval(ROW));
|
||||
|
||||
spec = new GreaterThanHavingSpec("metric", 9);
|
||||
assertTrue(spec.eval(ROW));
|
||||
}
|
||||
|
||||
private MapBasedInputRow makeRow(long ts, String dim, int value)
|
||||
{
|
||||
List<String> dimensions = Lists.newArrayList(dim);
|
||||
Map<String, Object> metrics = ImmutableMap.of("metric", (Object) Float.valueOf(value));
|
||||
|
||||
return new MapBasedInputRow(ts, dimensions, metrics);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLessThanHavingSpec() {
|
||||
LessThanHavingSpec spec = new LessThanHavingSpec("metric", 10);
|
||||
assertFalse(spec.eval(ROW));
|
||||
|
||||
spec = new LessThanHavingSpec("metric", 11);
|
||||
assertTrue(spec.eval(ROW));
|
||||
|
||||
spec = new LessThanHavingSpec("metric", 9);
|
||||
assertFalse(spec.eval(ROW));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEqualHavingSpec() {
|
||||
EqualToHavingSpec spec = new EqualToHavingSpec("metric", 10);
|
||||
assertTrue(spec.eval(ROW));
|
||||
|
||||
spec = new EqualToHavingSpec("metric", 9);
|
||||
assertFalse(spec.eval(ROW));
|
||||
|
||||
spec = new EqualToHavingSpec("metric", 11);
|
||||
assertFalse(spec.eval(ROW));
|
||||
}
|
||||
|
||||
private static class CountingHavingSpec implements HavingSpec {
|
||||
|
||||
private final AtomicInteger counter;
|
||||
private final boolean value;
|
||||
private CountingHavingSpec(AtomicInteger counter, boolean value)
|
||||
{
|
||||
this.counter = counter;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
counter.incrementAndGet();
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAndHavingSpecShouldSupportShortcutEvaluation () {
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
AndHavingSpec spec = new AndHavingSpec(ImmutableList.of(
|
||||
(HavingSpec)new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, false),
|
||||
new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, false)
|
||||
));
|
||||
|
||||
spec.eval(ROW);
|
||||
|
||||
assertEquals(2, counter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAndHavingSpec () {
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
AndHavingSpec spec = new AndHavingSpec(ImmutableList.of(
|
||||
(HavingSpec)new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, true)
|
||||
));
|
||||
|
||||
spec.eval(ROW);
|
||||
|
||||
assertEquals(4, counter.get());
|
||||
|
||||
counter.set(0);
|
||||
spec = new AndHavingSpec(ImmutableList.of(
|
||||
(HavingSpec)new CountingHavingSpec(counter, false),
|
||||
new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, true)
|
||||
));
|
||||
|
||||
spec.eval(ROW);
|
||||
|
||||
assertEquals(1, counter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOrHavingSpecSupportsShortcutEvaluation() {
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
OrHavingSpec spec = new OrHavingSpec(ImmutableList.of(
|
||||
(HavingSpec)new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, true),
|
||||
new CountingHavingSpec(counter, false)
|
||||
));
|
||||
|
||||
spec.eval(ROW);
|
||||
|
||||
assertEquals(1, counter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOrHavingSpec () {
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
OrHavingSpec spec = new OrHavingSpec(ImmutableList.of(
|
||||
(HavingSpec)new CountingHavingSpec(counter, false),
|
||||
new CountingHavingSpec(counter, false),
|
||||
new CountingHavingSpec(counter, false),
|
||||
new CountingHavingSpec(counter, false)
|
||||
));
|
||||
|
||||
spec.eval(ROW);
|
||||
|
||||
assertEquals(4, counter.get());
|
||||
|
||||
counter.set(0);
|
||||
spec = new OrHavingSpec(ImmutableList.of(
|
||||
(HavingSpec)new CountingHavingSpec(counter, false),
|
||||
new CountingHavingSpec(counter, false),
|
||||
new CountingHavingSpec(counter, false),
|
||||
new CountingHavingSpec(counter, true)
|
||||
));
|
||||
|
||||
spec.eval(ROW);
|
||||
|
||||
assertEquals(4, counter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotHavingSepc() {
|
||||
NotHavingSpec spec = new NotHavingSpec(HavingSpec.NEVER);
|
||||
assertTrue(spec.eval(ROW));
|
||||
|
||||
spec = new NotHavingSpec(HavingSpec.ALWAYS);
|
||||
assertFalse(spec.eval(ROW));
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
package com.metamx.druid.query.group.orderby;
|
||||
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TopNSorterTest
|
||||
{
|
||||
private static final long SEED = 2L;
|
||||
private static final Ordering<String> ASC = Ordering.natural();
|
||||
private static final Ordering<String> DESC = Ordering.natural().reverse();
|
||||
|
||||
private static final List<String> EMPTY = Collections.EMPTY_LIST;
|
||||
private static final List<String> SINGLE = Lists.newArrayList("a");
|
||||
private static final List<String> RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk"));
|
||||
private static final List<String> RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba"));
|
||||
|
||||
private Ordering<String> ordering;
|
||||
private List<String> rawInput;
|
||||
private int limit;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> makeTestData(){
|
||||
Object[][] data = new Object[][] {
|
||||
{ ASC, RAW_ASC, RAW_ASC.size() - 2},
|
||||
{ ASC, RAW_ASC, RAW_ASC.size()},
|
||||
{ ASC, RAW_ASC, RAW_ASC.size() + 2},
|
||||
{ ASC, RAW_ASC, 0},
|
||||
{ ASC, SINGLE, 0},
|
||||
{ ASC, SINGLE, 1},
|
||||
{ ASC, SINGLE, 2},
|
||||
{ ASC, SINGLE, 3},
|
||||
{ ASC, EMPTY, 0},
|
||||
{ ASC, EMPTY, 1},
|
||||
{ DESC, RAW_DESC, RAW_DESC.size() - 2},
|
||||
{ DESC, RAW_DESC, RAW_DESC.size()},
|
||||
{ DESC, RAW_DESC, RAW_DESC.size() + 2},
|
||||
{ DESC, RAW_DESC, 0},
|
||||
{ DESC, RAW_DESC, 0},
|
||||
{ DESC, SINGLE, 1},
|
||||
{ DESC, SINGLE, 2},
|
||||
{ DESC, SINGLE, 3},
|
||||
{ DESC, EMPTY, 0},
|
||||
{ DESC, EMPTY, 1},
|
||||
};
|
||||
|
||||
return Arrays.asList(data);
|
||||
}
|
||||
|
||||
public TopNSorterTest(Ordering<String> ordering, List<String> rawInput, int limit){
|
||||
this.ordering = ordering;
|
||||
this.rawInput = rawInput;
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOrderByWithLimit()
|
||||
{
|
||||
List<String> expected = rawInput.subList(0, Math.min(limit, rawInput.size()));
|
||||
List<String> inputs = Lists.newArrayList(rawInput);
|
||||
Collections.shuffle(inputs, new Random(2));
|
||||
|
||||
Iterable<String> result = new TopNSorter<String>(ordering).toTopN(inputs, limit);
|
||||
|
||||
Assert.assertEquals(expected, Lists.newArrayList(result));
|
||||
}
|
||||
}
|
|
@ -120,12 +120,6 @@
|
|||
<outputFile>
|
||||
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
|
||||
</outputFile>
|
||||
<relocations>
|
||||
<relocation>
|
||||
<pattern>org.codehaus.jackson</pattern>
|
||||
<shadedPattern>druid.org.codehaus.jackson</shadedPattern>
|
||||
</relocation>
|
||||
</relocations>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
|
|
10
pom.xml
10
pom.xml
|
@ -209,16 +209,6 @@
|
|||
<artifactId>jackson-jaxrs-json-provider</artifactId>
|
||||
<version>2.1.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-core-asl</artifactId>
|
||||
<version>1.9.11</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
<version>1.9.11</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.inject</groupId>
|
||||
<artifactId>javax.inject</artifactId>
|
||||
|
|
|
@ -69,7 +69,7 @@ public class TestHelper
|
|||
|
||||
assertResult(failMsg, expectedNext, next);
|
||||
assertResult(
|
||||
String.format("%sSecond iterator bad, multiple calls to iterator() should be safe", failMsg),
|
||||
String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg),
|
||||
expectedNext,
|
||||
next2
|
||||
);
|
||||
|
@ -77,20 +77,20 @@ public class TestHelper
|
|||
|
||||
if (resultsIter.hasNext()) {
|
||||
Assert.fail(
|
||||
String.format("%sExpected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next())
|
||||
String.format("%s: Expected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next())
|
||||
);
|
||||
}
|
||||
|
||||
if (resultsIter2.hasNext()) {
|
||||
Assert.fail(
|
||||
String.format("%sExpected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next())
|
||||
String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next())
|
||||
);
|
||||
}
|
||||
|
||||
if (expectedResultsIter.hasNext()) {
|
||||
Assert.fail(
|
||||
String.format(
|
||||
"%sExpected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
|
||||
"%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ public class TestHelper
|
|||
|
||||
Assert.assertEquals(failMsg, expectedNext, next);
|
||||
Assert.assertEquals(
|
||||
String.format("%sSecond iterator bad, multiple calls to iterator() should be safe", failMsg),
|
||||
String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg),
|
||||
expectedNext,
|
||||
next2
|
||||
);
|
||||
|
@ -117,20 +117,20 @@ public class TestHelper
|
|||
|
||||
if (resultsIter.hasNext()) {
|
||||
Assert.fail(
|
||||
String.format("%sExpected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next())
|
||||
String.format("%s: Expected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next())
|
||||
);
|
||||
}
|
||||
|
||||
if (resultsIter2.hasNext()) {
|
||||
Assert.fail(
|
||||
String.format("%sExpected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next())
|
||||
String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next())
|
||||
);
|
||||
}
|
||||
|
||||
if (expectedResultsIter.hasNext()) {
|
||||
Assert.fail(
|
||||
String.format(
|
||||
"%sExpected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
|
||||
"%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -22,9 +22,12 @@ package com.metamx.druid.query.group;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.druid.PeriodGranularity;
|
||||
|
@ -41,7 +44,12 @@ import com.metamx.druid.query.QueryRunner;
|
|||
import com.metamx.druid.query.QueryRunnerTestHelper;
|
||||
import com.metamx.druid.query.dimension.DefaultDimensionSpec;
|
||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
import com.metamx.druid.query.group.limit.OrderByColumnSpec;
|
||||
import com.metamx.druid.query.group.having.EqualToHavingSpec;
|
||||
import com.metamx.druid.query.group.having.GreaterThanHavingSpec;
|
||||
import com.metamx.druid.query.group.having.OrHavingSpec;
|
||||
import com.metamx.druid.query.group.orderby.DefaultLimitSpec;
|
||||
import com.metamx.druid.query.group.orderby.LimitSpec;
|
||||
import com.metamx.druid.query.group.orderby.OrderByColumnSpec;
|
||||
import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
@ -56,6 +64,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -89,7 +98,9 @@ public class GroupByQueryRunnerTest
|
|||
}
|
||||
)
|
||||
),
|
||||
new GroupByQueryRunnerFactoryConfig(){}
|
||||
new GroupByQueryRunnerFactoryConfig()
|
||||
{
|
||||
}
|
||||
);
|
||||
|
||||
return Lists.newArrayList(
|
||||
|
@ -106,13 +117,15 @@ public class GroupByQueryRunnerTest
|
|||
);
|
||||
}
|
||||
|
||||
public GroupByQueryRunnerTest(GroupByQueryRunnerFactory factory, QueryRunner runner) {
|
||||
public GroupByQueryRunnerTest(GroupByQueryRunnerFactory factory, QueryRunner runner)
|
||||
{
|
||||
this.factory = factory;
|
||||
this.runner = runner;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupBy() {
|
||||
public void testGroupBy()
|
||||
{
|
||||
GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
|
@ -155,7 +168,8 @@ public class GroupByQueryRunnerTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByWithTimeZone() {
|
||||
public void testGroupByWithTimeZone()
|
||||
{
|
||||
DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
|
||||
|
||||
GroupByQuery query = GroupByQuery.builder()
|
||||
|
@ -218,7 +232,8 @@ public class GroupByQueryRunnerTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMergeResults() {
|
||||
public void testMergeResults()
|
||||
{
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
|
@ -282,7 +297,180 @@ public class GroupByQueryRunnerTest
|
|||
|
||||
TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct");
|
||||
TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeResultsWithLimit()
|
||||
{
|
||||
for (int limit = 1; limit < 20; ++limit) {
|
||||
doTestMergeResultsWithValidLimit(limit);
|
||||
}
|
||||
}
|
||||
|
||||
private void doTestMergeResultsWithValidLimit(final int limit)
|
||||
{
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setInterval("2011-04-02/2011-04-04")
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("idx", "index")
|
||||
)
|
||||
)
|
||||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
|
||||
.setLimit(Integer.valueOf(limit));
|
||||
|
||||
final GroupByQuery fullQuery = builder.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
|
||||
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
|
||||
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
|
||||
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
|
||||
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
|
||||
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
|
||||
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
|
||||
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
|
||||
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
|
||||
);
|
||||
|
||||
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
|
||||
|
||||
mergeRunner.run(fullQuery).accumulate(null, new Accumulator<Object, Row>()
|
||||
{
|
||||
@Override
|
||||
public Object accumulate(Object o, Row row)
|
||||
{
|
||||
System.out.printf("%d: %s%n", limit, row);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
TestHelper.assertExpectedObjects(
|
||||
Iterables.limit(expectedResults, limit), mergeRunner.run(fullQuery), String.format("limit: %d", limit)
|
||||
);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testMergeResultsWithNegativeLimit()
|
||||
{
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setInterval("2011-04-02/2011-04-04")
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("idx", "index")
|
||||
)
|
||||
)
|
||||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
|
||||
.setLimit(Integer.valueOf(-1));
|
||||
|
||||
builder.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeResultsWithOrderBy()
|
||||
{
|
||||
LimitSpec[] orderBySpecs = new LimitSpec[]{
|
||||
new DefaultLimitSpec(OrderByColumnSpec.ascending("idx"), null),
|
||||
new DefaultLimitSpec(OrderByColumnSpec.ascending("rows", "idx"), null),
|
||||
new DefaultLimitSpec(OrderByColumnSpec.descending("idx"), null),
|
||||
new DefaultLimitSpec(OrderByColumnSpec.descending("rows", "idx"), null),
|
||||
};
|
||||
|
||||
final Comparator<Row> idxComparator =
|
||||
new Comparator<Row>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Row o1, Row o2)
|
||||
{
|
||||
return Float.compare(o1.getFloatMetric("idx"), o2.getFloatMetric("idx"));
|
||||
}
|
||||
};
|
||||
|
||||
Comparator<Row> rowsIdxComparator =
|
||||
new Comparator<Row>()
|
||||
{
|
||||
|
||||
@Override
|
||||
public int compare(Row o1, Row o2)
|
||||
{
|
||||
int value = Float.compare(o1.getFloatMetric("rows"), o2.getFloatMetric("rows"));
|
||||
if (value != 0) {
|
||||
return value;
|
||||
}
|
||||
|
||||
return idxComparator.compare(o1, o2);
|
||||
}
|
||||
};
|
||||
|
||||
List<Row> allResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
|
||||
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
|
||||
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
|
||||
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
|
||||
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
|
||||
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
|
||||
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
|
||||
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
|
||||
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
|
||||
);
|
||||
|
||||
List<List<Row>> expectedResults = Lists.newArrayList(
|
||||
Ordering.from(idxComparator).sortedCopy(allResults),
|
||||
Ordering.from(rowsIdxComparator).sortedCopy(allResults),
|
||||
Ordering.from(idxComparator).reverse().sortedCopy(allResults),
|
||||
Ordering.from(rowsIdxComparator).reverse().sortedCopy(allResults)
|
||||
);
|
||||
|
||||
for (int i = 0; i < orderBySpecs.length; ++i) {
|
||||
doTestMergeResultsWithOrderBy(orderBySpecs[i], expectedResults.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
private void doTestMergeResultsWithOrderBy(LimitSpec orderBySpec, List<Row> expectedResults)
|
||||
{
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setInterval("2011-04-02/2011-04-04")
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("idx", "index")
|
||||
)
|
||||
)
|
||||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
|
||||
.setLimitSpec(orderBySpec);
|
||||
|
||||
final GroupByQuery fullQuery = builder.build();
|
||||
|
||||
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
|
||||
new QueryRunner<Row>()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query<Row> query)
|
||||
{
|
||||
// simulate two daily segments
|
||||
final Query query1 = query.withQuerySegmentSpec(
|
||||
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
|
||||
);
|
||||
final Query query2 = query.withQuerySegmentSpec(
|
||||
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
|
||||
);
|
||||
return Sequences.concat(runner.run(query1), runner.run(query2));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -403,6 +591,59 @@ public class GroupByQueryRunnerTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHavingSpec()
|
||||
{
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
|
||||
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
|
||||
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L)
|
||||
);
|
||||
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setInterval("2011-04-02/2011-04-04")
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("idx", "index")
|
||||
)
|
||||
)
|
||||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
|
||||
.setHavingSpec(
|
||||
new OrHavingSpec(
|
||||
ImmutableList.of(
|
||||
new GreaterThanHavingSpec("rows", 2L),
|
||||
new EqualToHavingSpec("idx", 217L)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
final GroupByQuery fullQuery = builder.build();
|
||||
|
||||
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
|
||||
new QueryRunner<Row>()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query<Row> query)
|
||||
{
|
||||
// simulate two daily segments
|
||||
final Query query1 = query.withQuerySegmentSpec(
|
||||
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
|
||||
);
|
||||
final Query query2 = query.withQuerySegmentSpec(
|
||||
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
|
||||
);
|
||||
return Sequences.concat(runner.run(query1), runner.run(query2));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
||||
}
|
||||
|
||||
private Row createExpectedRow(final String timestamp, Object... vals)
|
||||
{
|
||||
return createExpectedRow(new DateTime(timestamp), vals);
|
||||
|
@ -417,6 +658,6 @@ public class GroupByQueryRunnerTest
|
|||
theVals.put(vals[i].toString(), vals[i + 1]);
|
||||
}
|
||||
|
||||
return new MapBasedRow(timestamp, theVals);
|
||||
return new MapBasedRow(new DateTime(timestamp), theVals);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue