Merge branch 'master' into spatial

This commit is contained in:
fjy 2013-05-03 23:09:30 -07:00
commit 8902986069
19 changed files with 530 additions and 210 deletions

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -1,150 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.group;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.ISE;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
/**
*/
public class DefaultLimitSpec implements LimitSpec
{
private static final byte CACHE_TYPE_ID = 0x0;
private static Joiner JOINER = Joiner.on("");
private final List<String> orderBy;
private final int limit;
private final Comparator<Row> comparator;
@JsonCreator
public DefaultLimitSpec(
@JsonProperty("orderBy") List<String> orderBy,
@JsonProperty("limit") int limit
)
{
this.orderBy = (orderBy == null) ? Lists.<String>newArrayList() : orderBy;
this.limit = limit;
this.comparator = makeComparator();
}
public DefaultLimitSpec()
{
this.orderBy = Lists.newArrayList();
this.limit = 0;
this.comparator = makeComparator();
}
@JsonProperty
@Override
public List<String> getOrderBy()
{
return orderBy;
}
@JsonProperty
@Override
public int getLimit()
{
return limit;
}
@Override
public Comparator<Row> getComparator()
{
return comparator;
}
@Override
public byte[] getCacheKey()
{
byte[] orderByBytes = JOINER.join(orderBy).getBytes();
byte[] limitBytes = Ints.toByteArray(limit);
return ByteBuffer.allocate(1 + orderByBytes.length + limitBytes.length)
.put(CACHE_TYPE_ID)
.put(orderByBytes)
.put(limitBytes)
.array();
}
@Override
public String toString()
{
return "DefaultLimitSpec{" +
"orderBy='" + orderBy + '\'' +
", limit=" + limit +
'}';
}
private Comparator<Row> makeComparator()
{
Ordering<Row> ordering = new Ordering<Row>()
{
@Override
public int compare(Row left, Row right)
{
return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch());
}
};
for (final String dimension : orderBy) {
ordering = ordering.compound(
new Comparator<Row>()
{
@Override
public int compare(Row left, Row right)
{
if (left instanceof MapBasedRow && right instanceof MapBasedRow) {
// There are no multi-value dimensions at this point, they should have been flattened out
String leftDimVal = left.getDimension(dimension).get(0);
String rightDimVal = right.getDimension(dimension).get(0);
return leftDimVal.compareTo(rightDimVal);
} else {
throw new ISE("Unknown type for rows[%s, %s]", left.getClass(), right.getClass());
}
}
}
);
}
final Ordering<Row> theOrdering = ordering;
return new Comparator<Row>()
{
@Override
public int compare(Row row, Row row2)
{
return theOrdering.compare(row, row2);
}
};
}
}

View File

@ -21,9 +21,12 @@ package com.metamx.druid.query.group;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.BaseQuery; import com.metamx.druid.BaseQuery;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity; import com.metamx.druid.QueryGranularity;
@ -34,6 +37,9 @@ import com.metamx.druid.query.Queries;
import com.metamx.druid.query.dimension.DefaultDimensionSpec; import com.metamx.druid.query.dimension.DefaultDimensionSpec;
import com.metamx.druid.query.dimension.DimensionSpec; import com.metamx.druid.query.dimension.DimensionSpec;
import com.metamx.druid.query.filter.DimFilter; import com.metamx.druid.query.filter.DimFilter;
import com.metamx.druid.query.group.limit.DefaultLimitSpec;
import com.metamx.druid.query.group.limit.LimitSpec;
import com.metamx.druid.query.group.limit.OrderByColumnSpec;
import com.metamx.druid.query.segment.LegacySegmentSpec; import com.metamx.druid.query.segment.LegacySegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentSpec; import com.metamx.druid.query.segment.QuerySegmentSpec;
@ -56,6 +62,8 @@ public class GroupByQuery extends BaseQuery<Row>
private final List<AggregatorFactory> aggregatorSpecs; private final List<AggregatorFactory> aggregatorSpecs;
private final List<PostAggregator> postAggregatorSpecs; private final List<PostAggregator> postAggregatorSpecs;
private final Function<Sequence<Row>, Sequence<Row>> orderByLimitFn;
@JsonCreator @JsonCreator
public GroupByQuery( public GroupByQuery(
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@ -80,6 +88,8 @@ public class GroupByQuery extends BaseQuery<Row>
Preconditions.checkNotNull(this.granularity, "Must specify a granularity"); Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator"); Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator");
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
orderByLimitFn = this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
} }
@JsonProperty @JsonProperty
@ -130,6 +140,11 @@ public class GroupByQuery extends BaseQuery<Row>
return Query.GROUP_BY; return Query.GROUP_BY;
} }
public Sequence<Row> applyLimit(Sequence<Row> results)
{
return orderByLimitFn.apply(results);
}
@Override @Override
public GroupByQuery withOverriddenContext(Map<String, String> contextOverride) public GroupByQuery withOverriddenContext(Map<String, String> contextOverride)
{ {
@ -166,7 +181,6 @@ public class GroupByQuery extends BaseQuery<Row>
{ {
private String dataSource; private String dataSource;
private QuerySegmentSpec querySegmentSpec; private QuerySegmentSpec querySegmentSpec;
private LimitSpec limitSpec;
private DimFilter dimFilter; private DimFilter dimFilter;
private QueryGranularity granularity; private QueryGranularity granularity;
private List<DimensionSpec> dimensions; private List<DimensionSpec> dimensions;
@ -174,6 +188,10 @@ public class GroupByQuery extends BaseQuery<Row>
private List<PostAggregator> postAggregatorSpecs; private List<PostAggregator> postAggregatorSpecs;
private Map<String, String> context; private Map<String, String> context;
private LimitSpec limitSpec = null;
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
private int limit = Integer.MAX_VALUE;
private Builder() {} private Builder() {}
private Builder(Builder builder) private Builder(Builder builder)
@ -200,12 +218,56 @@ public class GroupByQuery extends BaseQuery<Row>
return setQuerySegmentSpec(new LegacySegmentSpec(interval)); return setQuerySegmentSpec(new LegacySegmentSpec(interval));
} }
public Builder limit(int limit)
{
ensureExplicitLimitNotSet();
this.limit = limit;
return this;
}
public Builder addOrderByColumn(String dimension)
{
return addOrderByColumn(dimension, (OrderByColumnSpec.Direction) null);
}
public Builder addOrderByColumn(String dimension, String direction)
{
return addOrderByColumn(dimension, OrderByColumnSpec.determineDirection(direction));
}
public Builder addOrderByColumn(String dimension, OrderByColumnSpec.Direction direction)
{
return addOrderByColumn(new OrderByColumnSpec(dimension, direction));
}
public Builder addOrderByColumn(OrderByColumnSpec columnSpec)
{
ensureExplicitLimitNotSet();
this.orderByColumnSpecs.add(columnSpec);
return this;
}
public Builder setLimitSpec(LimitSpec limitSpec) public Builder setLimitSpec(LimitSpec limitSpec)
{ {
ensureFluentLimitsNotSet();
this.limitSpec = limitSpec; this.limitSpec = limitSpec;
return this; return this;
} }
private void ensureExplicitLimitNotSet()
{
if (limitSpec != null) {
throw new ISE("Ambiguous build, limitSpec[%s] already set", limitSpec);
}
}
private void ensureFluentLimitsNotSet()
{
if (! (limit == Integer.MAX_VALUE && orderByColumnSpecs.isEmpty()) ) {
throw new ISE("Ambiguous build, limit[%s] or columnSpecs[%s] already set.", limit, orderByColumnSpecs);
}
}
public Builder setQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) public Builder setQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{ {
this.querySegmentSpec = querySegmentSpec; this.querySegmentSpec = querySegmentSpec;
@ -295,10 +357,18 @@ public class GroupByQuery extends BaseQuery<Row>
public GroupByQuery build() public GroupByQuery build()
{ {
final LimitSpec theLimitSpec;
if (limitSpec == null) {
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
}
else {
theLimitSpec = limitSpec;
}
return new GroupByQuery( return new GroupByQuery(
dataSource, dataSource,
querySegmentSpec, querySegmentSpec,
limitSpec, theLimitSpec,
dimFilter, dimFilter,
granularity, granularity,
dimensions, dimensions,

View File

@ -25,8 +25,6 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ConcatSequence; import com.metamx.common.guava.ConcatSequence;
@ -50,7 +48,6 @@ import org.joda.time.Interval;
import org.joda.time.Minutes; import org.joda.time.Minutes;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -159,15 +156,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
} }
); );
// sort results to be returned return query.applyLimit(retVal);
if (!query.getLimitSpec().getOrderBy().isEmpty()) {
retVal = Sequences.sort(retVal, query.getLimitSpec().getComparator());
}
return Sequences.limit(
retVal,
(query.getLimitSpec().getLimit() > 0) ? query.getLimitSpec().getLimit() : maxRows
);
} }
@Override @Override

View File

@ -0,0 +1,184 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.group.limit;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.input.Row;
import com.metamx.druid.query.dimension.DimensionSpec;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/**
*/
public class DefaultLimitSpec implements LimitSpec
{
private final List<OrderByColumnSpec> orderBy;
private final int limit;
@JsonCreator
public DefaultLimitSpec(
@JsonProperty("orderBy") List<OrderByColumnSpec> orderBy,
@JsonProperty("limit") int limit
)
{
this.orderBy = (orderBy == null) ? ImmutableList.<OrderByColumnSpec>of() : orderBy;
this.limit = limit;
Preconditions.checkState(limit > 0, "limit[%s] must be >0", limit);
}
public DefaultLimitSpec()
{
this.orderBy = Lists.newArrayList();
this.limit = 0;
}
@JsonProperty
public List<OrderByColumnSpec> getOrderBy()
{
return orderBy;
}
@JsonProperty
public int getLimit()
{
return limit;
}
@Override
public Function<Sequence<Row>, Sequence<Row>> build(
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
)
{
// Materialize the Comparator first for fast-fail error checking.
final Comparator<Row> comparator = makeComparator(dimensions, aggs, postAggs);
return new Function<Sequence<Row>, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(Sequence<Row> input)
{
return Sequences.limit(Sequences.sort(input, comparator), limit);
}
};
}
private Comparator<Row> makeComparator(
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
)
{
Ordering<Row> ordering = new Ordering<Row>()
{
@Override
public int compare(Row left, Row right)
{
return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch());
}
};
Map<String, Ordering<Row>> possibleOrderings = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (DimensionSpec spec : dimensions) {
final String dimension = spec.getOutputName();
possibleOrderings.put(dimension, dimensionOrdering(dimension));
}
for (final AggregatorFactory agg : aggs) {
final String column = agg.getName();
possibleOrderings.put(column, metricOrdering(column, agg.getComparator()));
}
for (PostAggregator postAgg : postAggs) {
final String column = postAgg.getName();
possibleOrderings.put(column, metricOrdering(column, postAgg.getComparator()));
}
for (OrderByColumnSpec columnSpec : orderBy) {
Ordering<Row> nextOrdering = possibleOrderings.get(columnSpec.getDimension());
if (nextOrdering == null) {
throw new ISE("Unknown column in order clause[%s]", columnSpec);
}
switch (columnSpec.getDirection()) {
case DESCENDING:
nextOrdering = nextOrdering.reverse();
}
ordering = ordering.compound(nextOrdering);
}
return ordering;
}
private Ordering<Row> metricOrdering(final String column, final Comparator comparator)
{
return new Ordering<Row>()
{
@SuppressWarnings("unchecked")
@Override
public int compare(Row left, Row right)
{
return comparator.compare(left.getFloatMetric(column), right.getFloatMetric(column));
}
};
}
private Ordering<Row> dimensionOrdering(final String dimension)
{
return Ordering.natural()
.nullsFirst()
.onResultOf(
new Function<Row, String>()
{
@Override
public String apply(Row input)
{
// Multi-value dimensions have all been flattened at this point;
final List<String> dimList = input.getDimension(dimension);
return dimList.isEmpty() ? null : dimList.get(0);
}
}
);
}
@Override
public String toString()
{
return "DefaultLimitSpec{" +
"orderBy='" + orderBy + '\'' +
", limit=" + limit +
'}';
}
}

View File

@ -17,28 +17,26 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.query.group; package com.metamx.druid.query.group.limit;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.input.Row; import com.metamx.druid.input.Row;
import com.metamx.druid.query.dimension.DimensionSpec;
import java.util.Comparator;
import java.util.List; import java.util.List;
/** /**
*/ */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = NoopLimitSpec.class)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "default", value = DefaultLimitSpec.class) @JsonSubTypes.Type(name = "default", value = DefaultLimitSpec.class)
}) })
public interface LimitSpec public interface LimitSpec
{ {
public List<String> getOrderBy(); public Function<Sequence<Row>, Sequence<Row>> build(List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs);
public int getLimit();
public Comparator<Row> getComparator();
public byte[] getCacheKey();
} }

View File

@ -0,0 +1,24 @@
package com.metamx.druid.query.group.limit;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.input.Row;
import com.metamx.druid.query.dimension.DimensionSpec;
import java.util.List;
/**
*/
public class NoopLimitSpec implements LimitSpec
{
@Override
public Function<Sequence<Row>, Sequence<Row>> build(
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
)
{
return Functions.identity();
}
}

View File

@ -0,0 +1,120 @@
package com.metamx.druid.query.group.limit;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import java.util.Map;
/**
*/
public class OrderByColumnSpec
{
/**
* Maintain a map of the enum values so that we can just do a lookup and get a null if it doesn't exist instead
* of an exception thrown.
*/
private static final Map<String, Direction> stupidEnumMap;
static
{
final ImmutableMap.Builder<String, Direction> bob = ImmutableMap.builder();
for (Direction direction : Direction.values()) {
bob.put(direction.toString(), direction);
}
stupidEnumMap = bob.build();
}
private final String dimension;
private final Direction direction;
@JsonCreator
public static OrderByColumnSpec create(Object obj)
{
Preconditions.checkNotNull(obj, "Cannot build an OrderByColumnSpec from a null object.");
if (obj instanceof String) {
return new OrderByColumnSpec(obj.toString(), null);
}
else if (obj instanceof Map) {
final Map map = (Map) obj;
final String dimension = map.get("dimension").toString();
final Direction direction = determineDirection(map.get("direction"));
return new OrderByColumnSpec(dimension, direction);
}
else {
throw new ISE("Cannot build an OrderByColumnSpec from a %s", obj.getClass());
}
}
public OrderByColumnSpec(
String dimension,
Direction direction
)
{
this.dimension = dimension;
this.direction = direction == null ? Direction.ASCENDING : direction;
}
@JsonProperty
public String getDimension()
{
return dimension;
}
@JsonProperty
public Direction getDirection()
{
return direction;
}
public static Direction determineDirection(Object directionObj)
{
if (directionObj == null) {
return null;
}
String directionString = directionObj.toString();
Direction direction = stupidEnumMap.get(directionString);
if (direction == null) {
final String lowerDimension = directionString.toLowerCase();
for (Direction dir : Direction.values()) {
if (dir.toString().toLowerCase().startsWith(lowerDimension)) {
if (direction != null) {
throw new ISE("Ambiguous directions[%s] and [%s]", direction, dir);
}
direction = dir;
}
}
}
if (direction == null) {
throw new IAE("Unknown direction[%s]", directionString);
}
return direction;
}
@Override
public String toString()
{
return "OrderByColumnSpec{" +
"dimension='" + dimension + '\'' +
", direction=" + direction +
'}';
}
public static enum Direction
{
ASCENDING,
DESCENDING
}
}

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -29,7 +29,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -22,7 +22,6 @@ package com.metamx.druid.query.group;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -41,6 +40,7 @@ import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerTestHelper; import com.metamx.druid.query.QueryRunnerTestHelper;
import com.metamx.druid.query.dimension.DefaultDimensionSpec; import com.metamx.druid.query.dimension.DefaultDimensionSpec;
import com.metamx.druid.query.dimension.DimensionSpec; import com.metamx.druid.query.dimension.DimensionSpec;
import com.metamx.druid.query.group.limit.OrderByColumnSpec;
import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec; import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
@ -126,7 +126,7 @@ public class GroupByQueryRunnerTest
.setGranularity(QueryRunnerTestHelper.dayGran) .setGranularity(QueryRunnerTestHelper.dayGran)
.build(); .build();
List<Row> expectedResults = Arrays.<Row>asList( List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
@ -187,25 +187,25 @@ public class GroupByQueryRunnerTest
.build(); .build();
List<Row> expectedResults = Arrays.asList( List<Row> expectedResults = Arrays.asList(
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "automotive", "rows", 1L, "idx", 135L)), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L),
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "business", "rows", 1L, "idx", 118L)), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "business", "rows", 1L, "idx", 118L),
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "entertainment", "rows", 1L, "idx", 158L)), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "entertainment", "rows", 1L, "idx", 158L),
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "health", "rows", 1L, "idx", 120L)), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "health", "rows", 1L, "idx", 120L),
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "mezzanine", "rows", 3L, "idx", 2870L)), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "mezzanine", "rows", 3L, "idx", 2870L),
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "news", "rows", 1L, "idx", 121L)), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "news", "rows", 1L, "idx", 121L),
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "premium", "rows", 3L, "idx", 2900L)), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "premium", "rows", 3L, "idx", 2900L),
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "technology", "rows", 1L, "idx", 78L)), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "technology", "rows", 1L, "idx", 78L),
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "travel", "rows", 1L, "idx", 119L)), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "travel", "rows", 1L, "idx", 119L),
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "automotive", "rows", 1L, "idx", 147L)), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "automotive", "rows", 1L, "idx", 147L),
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "business", "rows", 1L, "idx", 112L)), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "business", "rows", 1L, "idx", 112L),
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "entertainment", "rows", 1L, "idx", 166L)), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "entertainment", "rows", 1L, "idx", 166L),
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "health", "rows", 1L, "idx", 113L)), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "health", "rows", 1L, "idx", 113L),
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "mezzanine", "rows", 3L, "idx", 2447L)), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "mezzanine", "rows", 3L, "idx", 2447L),
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "news", "rows", 1L, "idx", 114L)), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "news", "rows", 1L, "idx", 114L),
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "premium", "rows", 3L, "idx", 2505L)), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "premium", "rows", 3L, "idx", 2505L),
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "technology", "rows", 1L, "idx", 97L)), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "technology", "rows", 1L, "idx", 97L),
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "travel", "rows", 1L, "idx", 126L)) createExpectedRow(new DateTime("2011-04-01", tz), "alias", "travel", "rows", 1L, "idx", 126L)
); );
Iterable<Row> results = Sequences.toList( Iterable<Row> results = Sequences.toList(
@ -252,7 +252,7 @@ public class GroupByQueryRunnerTest
} }
); );
List<Row> expectedResults = Arrays.<Row>asList( List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L), createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L), createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L), createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
@ -267,7 +267,7 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct"); TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged"); TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
List<Row> allGranExpectedResults = Arrays.<Row>asList( List<Row> allGranExpectedResults = Arrays.asList(
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L), createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 217L), createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L), createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L),
@ -284,7 +284,92 @@ public class GroupByQueryRunnerTest
} }
private MapBasedRow createExpectedRow(final String timestamp, Object... vals) @Test
public void testGroupByOrderLimit() throws Exception
{
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.addOrderByColumn("rows")
.addOrderByColumn("alias", OrderByColumnSpec.Direction.DESCENDING)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
final GroupByQuery query = builder.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build()), "limited"
);
}
@Test
public void testGroupByWithOrderLimit2() throws Exception
{
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.addOrderByColumn("rows", "desc")
.addOrderByColumn("alias", "d")
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
final GroupByQuery query = builder.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build()), "limited"
);
}
private Row createExpectedRow(final String timestamp, Object... vals)
{
return createExpectedRow(new DateTime(timestamp), vals);
}
private Row createExpectedRow(final DateTime timestamp, Object... vals)
{ {
Preconditions.checkArgument(vals.length % 2 == 0); Preconditions.checkArgument(vals.length % 2 == 0);
@ -293,6 +378,6 @@ public class GroupByQueryRunnerTest
theVals.put(vals[i].toString(), vals[i+1]); theVals.put(vals[i].toString(), vals[i+1]);
} }
return new MapBasedRow(new DateTime(timestamp), theVals); return new MapBasedRow(timestamp, theVals);
} }
} }

View File

@ -34,7 +34,6 @@ import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerTest; import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerTest;
import com.metamx.druid.result.Result; import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeseriesResultValue; import com.metamx.druid.result.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -48,6 +47,7 @@ import java.util.Collection;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
{ {
@SuppressWarnings("unchecked")
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException public static Collection<?> constructorFeeder() throws IOException
{ {

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId> <artifactId>druid-services</artifactId>
<name>druid-services</name> <name>druid-services</name>
<description>druid-services</description> <description>druid-services</description>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.3-SNAPSHOT</version> <version>0.4.4-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>