mirror of https://github.com/apache/druid.git
limit spec for group by
This commit is contained in:
parent
c89f4cc85f
commit
c15ac1fc90
|
@ -0,0 +1,150 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.group;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.druid.input.MapBasedRow;
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DefaultLimitSpec implements LimitSpec
|
||||
{
|
||||
private static final byte CACHE_TYPE_ID = 0x0;
|
||||
private static Joiner JOINER = Joiner.on("");
|
||||
|
||||
private final List<String> orderBy;
|
||||
private final int limit;
|
||||
private final Comparator<Row> comparator;
|
||||
|
||||
@JsonCreator
|
||||
public DefaultLimitSpec(
|
||||
@JsonProperty("orderBy") List<String> orderBy,
|
||||
@JsonProperty("limit") int limit
|
||||
)
|
||||
{
|
||||
this.orderBy = (orderBy == null) ? Lists.<String>newArrayList() : orderBy;
|
||||
this.limit = limit;
|
||||
this.comparator = makeComparator();
|
||||
}
|
||||
|
||||
public DefaultLimitSpec()
|
||||
{
|
||||
this.orderBy = Lists.newArrayList();
|
||||
this.limit = 0;
|
||||
this.comparator = makeComparator();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public List<String> getOrderBy()
|
||||
{
|
||||
return orderBy;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public int getLimit()
|
||||
{
|
||||
return limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<Row> getComparator()
|
||||
{
|
||||
return comparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] orderByBytes = JOINER.join(orderBy).getBytes();
|
||||
|
||||
byte[] limitBytes = Ints.toByteArray(limit);
|
||||
|
||||
return ByteBuffer.allocate(1 + orderByBytes.length + limitBytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(orderByBytes)
|
||||
.put(limitBytes)
|
||||
.array();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "DefaultLimitSpec{" +
|
||||
"orderBy='" + orderBy + '\'' +
|
||||
", limit=" + limit +
|
||||
'}';
|
||||
}
|
||||
|
||||
private Comparator<Row> makeComparator()
|
||||
{
|
||||
Ordering<Row> ordering = new Ordering<Row>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Row left, Row right)
|
||||
{
|
||||
return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch());
|
||||
}
|
||||
};
|
||||
|
||||
for (final String dimension : orderBy) {
|
||||
ordering = ordering.compound(
|
||||
new Comparator<Row>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Row left, Row right)
|
||||
{
|
||||
if (left instanceof MapBasedRow && right instanceof MapBasedRow) {
|
||||
// There are no multi-value dimensions at this point, they should have been flattened out
|
||||
String leftDimVal = left.getDimension(dimension).get(0);
|
||||
String rightDimVal = right.getDimension(dimension).get(0);
|
||||
return leftDimVal.compareTo(rightDimVal);
|
||||
} else {
|
||||
throw new ISE("Unknown type for rows[%s, %s]", left.getClass(), right.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
final Ordering<Row> theOrdering = ordering;
|
||||
|
||||
return new Comparator<Row>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Row row, Row row2)
|
||||
{
|
||||
return theOrdering.compare(row, row2);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -49,6 +49,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return new Builder();
|
||||
}
|
||||
|
||||
private final LimitSpec limitSpec;
|
||||
private final DimFilter dimFilter;
|
||||
private final QueryGranularity granularity;
|
||||
private final List<DimensionSpec> dimensions;
|
||||
|
@ -59,6 +60,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
public GroupByQuery(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
||||
@JsonProperty("limitSpec") LimitSpec limitSpec,
|
||||
@JsonProperty("filter") DimFilter dimFilter,
|
||||
@JsonProperty("granularity") QueryGranularity granularity,
|
||||
@JsonProperty("dimensions") List<DimensionSpec> dimensions,
|
||||
|
@ -68,6 +70,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
this.limitSpec = (limitSpec == null) ? new DefaultLimitSpec() : limitSpec;
|
||||
this.dimFilter = dimFilter;
|
||||
this.granularity = granularity;
|
||||
this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions;
|
||||
|
@ -79,6 +82,12 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public LimitSpec getLimitSpec()
|
||||
{
|
||||
return limitSpec;
|
||||
}
|
||||
|
||||
@JsonProperty("filter")
|
||||
public DimFilter getDimFilter()
|
||||
{
|
||||
|
@ -127,6 +136,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return new GroupByQuery(
|
||||
getDataSource(),
|
||||
getQuerySegmentSpec(),
|
||||
limitSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
|
@ -142,6 +152,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return new GroupByQuery(
|
||||
getDataSource(),
|
||||
spec,
|
||||
limitSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
|
@ -155,6 +166,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
{
|
||||
private String dataSource;
|
||||
private QuerySegmentSpec querySegmentSpec;
|
||||
private LimitSpec limitSpec;
|
||||
private DimFilter dimFilter;
|
||||
private QueryGranularity granularity;
|
||||
private List<DimensionSpec> dimensions;
|
||||
|
@ -168,6 +180,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
{
|
||||
dataSource = builder.dataSource;
|
||||
querySegmentSpec = builder.querySegmentSpec;
|
||||
limitSpec = builder.limitSpec;
|
||||
dimFilter = builder.dimFilter;
|
||||
granularity = builder.granularity;
|
||||
dimensions = builder.dimensions;
|
||||
|
@ -187,6 +200,12 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return setQuerySegmentSpec(new LegacySegmentSpec(interval));
|
||||
}
|
||||
|
||||
public Builder setLimitSpec(LimitSpec limitSpec)
|
||||
{
|
||||
this.limitSpec = limitSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
|
||||
{
|
||||
this.querySegmentSpec = querySegmentSpec;
|
||||
|
@ -279,6 +298,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return new GroupByQuery(
|
||||
dataSource,
|
||||
querySegmentSpec,
|
||||
limitSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
|
|
|
@ -21,10 +21,12 @@ package com.metamx.druid.query.group;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.ConcatSequence;
|
||||
|
@ -44,11 +46,11 @@ import com.metamx.druid.query.QueryToolChest;
|
|||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Minutes;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
@ -57,8 +59,9 @@ import java.util.Properties;
|
|||
*/
|
||||
public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery>
|
||||
{
|
||||
|
||||
private static final TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>(){};
|
||||
private static final TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>()
|
||||
{
|
||||
};
|
||||
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
|
||||
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
|
||||
|
||||
|
@ -81,22 +84,21 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
{
|
||||
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
|
||||
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
return runner.run(input);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Sequence<Row> mergeGroupByResults(GroupByQuery query, QueryRunner<Row> runner)
|
||||
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
|
||||
{
|
||||
final QueryGranularity gran = query.getGranularity();
|
||||
final long timeStart = query.getIntervals().get(0).getStartMillis();
|
||||
|
||||
// use gran.iterable instead of gran.truncate so that
|
||||
// AllGranularity returns timeStart instead of Long.MIN_VALUE
|
||||
final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next();
|
||||
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();
|
||||
|
||||
final List<AggregatorFactory> aggs = Lists.transform(
|
||||
query.getAggregatorSpecs(),
|
||||
|
@ -144,7 +146,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
);
|
||||
|
||||
// convert millis back to timestamp according to granularity to preserve time zone information
|
||||
return Sequences.map(
|
||||
Sequence<Row> retVal = Sequences.map(
|
||||
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
||||
new Function<Row, Row>()
|
||||
{
|
||||
|
@ -156,6 +158,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
}
|
||||
}
|
||||
);
|
||||
|
||||
// sort results to be returned
|
||||
if (!query.getLimitSpec().getOrderBy().isEmpty()) {
|
||||
retVal = Sequences.sort(retVal, query.getLimitSpec().getComparator());
|
||||
}
|
||||
|
||||
return Sequences.limit(
|
||||
retVal,
|
||||
(query.getLimitSpec().getLimit() > 0) ? query.getLimitSpec().getLimit() : maxRows
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -183,9 +195,24 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
}
|
||||
|
||||
@Override
|
||||
public Function<Row, Row> makeMetricManipulatorFn(GroupByQuery query, MetricManipulationFn fn)
|
||||
public Function<Row, Row> makeMetricManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn)
|
||||
{
|
||||
return Functions.identity();
|
||||
return new Function<Row, Row>()
|
||||
{
|
||||
@Override
|
||||
public Row apply(Row input)
|
||||
{
|
||||
if (input instanceof MapBasedRow) {
|
||||
final MapBasedRow inputRow = (MapBasedRow) input;
|
||||
final Map<String, Object> values = Maps.newHashMap(((MapBasedRow) input).getEvent());
|
||||
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
|
||||
values.put(agg.getName(), fn.manipulate(agg, inputRow.getEvent().get(agg.getName())));
|
||||
}
|
||||
return new MapBasedRow(inputRow.getTimestamp(), values);
|
||||
}
|
||||
return input;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.group;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.metamx.druid.input.Row;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "default", value = DefaultLimitSpec.class)
|
||||
})
|
||||
public interface LimitSpec
|
||||
{
|
||||
public List<String> getOrderBy();
|
||||
|
||||
public int getLimit();
|
||||
|
||||
public Comparator<Row> getComparator();
|
||||
|
||||
public byte[] getCacheKey();
|
||||
}
|
2
pom.xml
2
pom.xml
|
@ -38,7 +38,7 @@
|
|||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<metamx.java-util.version>0.22.0</metamx.java-util.version>
|
||||
<metamx.java-util.version>0.22.3</metamx.java-util.version>
|
||||
<netflix.curator.version>2.0.1-21-22</netflix.curator.version>
|
||||
</properties>
|
||||
|
||||
|
|
Loading…
Reference in New Issue