diff --git a/client/src/main/java/com/metamx/druid/query/group/DefaultLimitSpec.java b/client/src/main/java/com/metamx/druid/query/group/DefaultLimitSpec.java new file mode 100644 index 00000000000..4a09a6dfb79 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/DefaultLimitSpec.java @@ -0,0 +1,150 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query.group; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.metamx.common.ISE; +import com.metamx.druid.input.MapBasedRow; +import com.metamx.druid.input.Row; + +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; + +/** + */ +public class DefaultLimitSpec implements LimitSpec +{ + private static final byte CACHE_TYPE_ID = 0x0; + private static Joiner JOINER = Joiner.on(""); + + private final List orderBy; + private final int limit; + private final Comparator comparator; + + @JsonCreator + public DefaultLimitSpec( + @JsonProperty("orderBy") List orderBy, + @JsonProperty("limit") int limit + ) + { + this.orderBy = (orderBy == null) ? Lists.newArrayList() : orderBy; + this.limit = limit; + this.comparator = makeComparator(); + } + + public DefaultLimitSpec() + { + this.orderBy = Lists.newArrayList(); + this.limit = 0; + this.comparator = makeComparator(); + } + + @JsonProperty + @Override + public List getOrderBy() + { + return orderBy; + } + + @JsonProperty + @Override + public int getLimit() + { + return limit; + } + + @Override + public Comparator getComparator() + { + return comparator; + } + + @Override + public byte[] getCacheKey() + { + byte[] orderByBytes = JOINER.join(orderBy).getBytes(); + + byte[] limitBytes = Ints.toByteArray(limit); + + return ByteBuffer.allocate(1 + orderByBytes.length + limitBytes.length) + .put(CACHE_TYPE_ID) + .put(orderByBytes) + .put(limitBytes) + .array(); + } + + @Override + public String toString() + { + return "DefaultLimitSpec{" + + "orderBy='" + orderBy + '\'' + + ", limit=" + limit + + '}'; + } + + private Comparator makeComparator() + { + Ordering ordering = new Ordering() + { + @Override + public int compare(Row left, Row right) + { + return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch()); + } + }; + + for (final String dimension : orderBy) { + ordering = ordering.compound( + new Comparator() + { + @Override + public int compare(Row left, Row right) + { + if (left instanceof MapBasedRow && right instanceof MapBasedRow) { + // There are no multi-value dimensions at this point, they should have been flattened out + String leftDimVal = left.getDimension(dimension).get(0); + String rightDimVal = right.getDimension(dimension).get(0); + return leftDimVal.compareTo(rightDimVal); + } else { + throw new ISE("Unknown type for rows[%s, %s]", left.getClass(), right.getClass()); + } + } + } + ); + } + final Ordering theOrdering = ordering; + + return new Comparator() + { + @Override + public int compare(Row row, Row row2) + { + return theOrdering.compare(row, row2); + } + }; + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java index df8ccedaa7a..6f8eb052cb1 100644 --- a/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java +++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java @@ -49,6 +49,7 @@ public class GroupByQuery extends BaseQuery return new Builder(); } + private final LimitSpec limitSpec; private final DimFilter dimFilter; private final QueryGranularity granularity; private final List dimensions; @@ -59,6 +60,7 @@ public class GroupByQuery extends BaseQuery public GroupByQuery( @JsonProperty("dataSource") String dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, + @JsonProperty("limitSpec") LimitSpec limitSpec, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("dimensions") List dimensions, @@ -68,6 +70,7 @@ public class GroupByQuery extends BaseQuery ) { super(dataSource, querySegmentSpec, context); + this.limitSpec = (limitSpec == null) ? new DefaultLimitSpec() : limitSpec; this.dimFilter = dimFilter; this.granularity = granularity; this.dimensions = dimensions == null ? ImmutableList.of() : dimensions; @@ -79,6 +82,12 @@ public class GroupByQuery extends BaseQuery Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); } + @JsonProperty + public LimitSpec getLimitSpec() + { + return limitSpec; + } + @JsonProperty("filter") public DimFilter getDimFilter() { @@ -127,6 +136,7 @@ public class GroupByQuery extends BaseQuery return new GroupByQuery( getDataSource(), getQuerySegmentSpec(), + limitSpec, dimFilter, granularity, dimensions, @@ -142,6 +152,7 @@ public class GroupByQuery extends BaseQuery return new GroupByQuery( getDataSource(), spec, + limitSpec, dimFilter, granularity, dimensions, @@ -155,6 +166,7 @@ public class GroupByQuery extends BaseQuery { private String dataSource; private QuerySegmentSpec querySegmentSpec; + private LimitSpec limitSpec; private DimFilter dimFilter; private QueryGranularity granularity; private List dimensions; @@ -168,6 +180,7 @@ public class GroupByQuery extends BaseQuery { dataSource = builder.dataSource; querySegmentSpec = builder.querySegmentSpec; + limitSpec = builder.limitSpec; dimFilter = builder.dimFilter; granularity = builder.granularity; dimensions = builder.dimensions; @@ -187,6 +200,12 @@ public class GroupByQuery extends BaseQuery return setQuerySegmentSpec(new LegacySegmentSpec(interval)); } + public Builder setLimitSpec(LimitSpec limitSpec) + { + this.limitSpec = limitSpec; + return this; + } + public Builder setQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) { this.querySegmentSpec = querySegmentSpec; @@ -279,6 +298,7 @@ public class GroupByQuery extends BaseQuery return new GroupByQuery( dataSource, querySegmentSpec, + limitSpec, dimFilter, granularity, dimensions, diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java index f6e63b36acb..f7e81fea4d0 100644 --- a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java @@ -21,10 +21,12 @@ package com.metamx.druid.query.group; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; -import com.google.common.base.Functions; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Longs; import com.metamx.common.ISE; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.ConcatSequence; @@ -44,11 +46,11 @@ import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.dimension.DimensionSpec; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.service.ServiceMetricEvent; - import org.joda.time.Interval; import org.joda.time.Minutes; import javax.annotation.Nullable; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -57,8 +59,9 @@ import java.util.Properties; */ public class GroupByQueryQueryToolChest extends QueryToolChest { - - private static final TypeReference TYPE_REFERENCE = new TypeReference(){}; + private static final TypeReference TYPE_REFERENCE = new TypeReference() + { + }; private static final String GROUP_BY_MERGE_KEY = "groupByMerge"; private static final Map NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); @@ -81,22 +84,21 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeGroupByResults(GroupByQuery query, QueryRunner runner) + private Sequence mergeGroupByResults(final GroupByQuery query, QueryRunner runner) { final QueryGranularity gran = query.getGranularity(); final long timeStart = query.getIntervals().get(0).getStartMillis(); // use gran.iterable instead of gran.truncate so that // AllGranularity returns timeStart instead of Long.MIN_VALUE - final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next(); + final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next(); final List aggs = Lists.transform( query.getAggregatorSpecs(), @@ -144,7 +146,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest retVal = Sequences.map( Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), new Function() { @@ -156,6 +158,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest 0) ? query.getLimitSpec().getLimit() : maxRows + ); } @Override @@ -183,9 +195,24 @@ public class GroupByQueryQueryToolChest extends QueryToolChest makeMetricManipulatorFn(GroupByQuery query, MetricManipulationFn fn) + public Function makeMetricManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn) { - return Functions.identity(); + return new Function() + { + @Override + public Row apply(Row input) + { + if (input instanceof MapBasedRow) { + final MapBasedRow inputRow = (MapBasedRow) input; + final Map values = Maps.newHashMap(((MapBasedRow) input).getEvent()); + for (AggregatorFactory agg : query.getAggregatorSpecs()) { + values.put(agg.getName(), fn.manipulate(agg, inputRow.getEvent().get(agg.getName()))); + } + return new MapBasedRow(inputRow.getTimestamp(), values); + } + return input; + } + }; } @Override diff --git a/client/src/main/java/com/metamx/druid/query/group/LimitSpec.java b/client/src/main/java/com/metamx/druid/query/group/LimitSpec.java new file mode 100644 index 00000000000..dab5414a16a --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/LimitSpec.java @@ -0,0 +1,44 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query.group; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.metamx.druid.input.Row; + +import java.util.Comparator; +import java.util.List; + +/** + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "default", value = DefaultLimitSpec.class) +}) +public interface LimitSpec +{ + public List getOrderBy(); + + public int getLimit(); + + public Comparator getComparator(); + + public byte[] getCacheKey(); +} diff --git a/pom.xml b/pom.xml index 7fc9fbe7438..e79dd0aaa57 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ UTF-8 - 0.22.0 + 0.22.3 2.0.1-21-22