mirror of https://github.com/apache/druid.git
Merge pull request #2381 from himanshug/gp_by_builder_bug
lazily create comparators for row columns when needed
This commit is contained in:
commit
633ee6ee34
|
@ -628,9 +628,6 @@ public class GroupByQuery extends BaseQuery<Row>
|
||||||
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) {
|
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (limitFn != null ? !limitFn.equals(that.limitFn) : that.limitFn != null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (postAggregatorSpecs != null
|
if (postAggregatorSpecs != null
|
||||||
? !postAggregatorSpecs.equals(that.postAggregatorSpecs)
|
? !postAggregatorSpecs.equals(that.postAggregatorSpecs)
|
||||||
: that.postAggregatorSpecs != null) {
|
: that.postAggregatorSpecs != null) {
|
||||||
|
|
|
@ -115,24 +115,32 @@ public class DefaultLimitSpec implements LimitSpec
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Map<String, Ordering<Row>> possibleOrderings = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
|
Map<String, DimensionSpec> dimensionsMap = Maps.newHashMap();
|
||||||
for (DimensionSpec spec : dimensions) {
|
for (DimensionSpec spec : dimensions) {
|
||||||
final String dimension = spec.getOutputName();
|
dimensionsMap.put(spec.getOutputName(), spec);
|
||||||
possibleOrderings.put(dimension, dimensionOrdering(dimension));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Map<String, AggregatorFactory> aggregatorsMap = Maps.newHashMap();
|
||||||
for (final AggregatorFactory agg : aggs) {
|
for (final AggregatorFactory agg : aggs) {
|
||||||
final String column = agg.getName();
|
aggregatorsMap.put(agg.getName(), agg);
|
||||||
possibleOrderings.put(column, metricOrdering(column, agg.getComparator()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Map<String, PostAggregator> postAggregatorsMap = Maps.newHashMap();
|
||||||
for (PostAggregator postAgg : postAggs) {
|
for (PostAggregator postAgg : postAggs) {
|
||||||
final String column = postAgg.getName();
|
postAggregatorsMap.put(postAgg.getName(), postAgg);
|
||||||
possibleOrderings.put(column, metricOrdering(column, postAgg.getComparator()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (OrderByColumnSpec columnSpec : columns) {
|
for (OrderByColumnSpec columnSpec : columns) {
|
||||||
Ordering<Row> nextOrdering = possibleOrderings.get(columnSpec.getDimension());
|
String columnName = columnSpec.getDimension();
|
||||||
|
Ordering<Row> nextOrdering = null;
|
||||||
|
|
||||||
|
if (postAggregatorsMap.containsKey(columnName)) {
|
||||||
|
nextOrdering = metricOrdering(columnName, postAggregatorsMap.get(columnName).getComparator());
|
||||||
|
} else if (aggregatorsMap.containsKey(columnName)) {
|
||||||
|
nextOrdering = metricOrdering(columnName, aggregatorsMap.get(columnName).getComparator());
|
||||||
|
} else if (dimensionsMap.containsKey(columnName)) {
|
||||||
|
nextOrdering = dimensionOrdering(columnName);
|
||||||
|
}
|
||||||
|
|
||||||
if (nextOrdering == null) {
|
if (nextOrdering == null) {
|
||||||
throw new ISE("Unknown column in order clause[%s]", columnSpec);
|
throw new ISE("Unknown column in order clause[%s]", columnSpec);
|
||||||
|
|
|
@ -172,6 +172,33 @@ public class OrderByColumnSpec
|
||||||
return direction;
|
return direction;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
OrderByColumnSpec that = (OrderByColumnSpec) o;
|
||||||
|
|
||||||
|
if (!dimension.equals(that.dimension)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return direction == that.direction;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = dimension.hashCode();
|
||||||
|
result = 31 * result + direction.hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
|
|
|
@ -20,14 +20,19 @@
|
||||||
package io.druid.query.groupby;
|
package io.druid.query.groupby;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunnerTestHelper;
|
import io.druid.query.QueryRunnerTestHelper;
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.PostAggregator;
|
||||||
|
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||||
import io.druid.query.dimension.DimensionSpec;
|
import io.druid.query.dimension.DimensionSpec;
|
||||||
|
import io.druid.query.groupby.orderby.DefaultLimitSpec;
|
||||||
|
import io.druid.query.groupby.orderby.OrderByColumnSpec;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -53,6 +58,13 @@ public class GroupByQueryTest
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||||
|
.setPostAggregatorSpecs(ImmutableList.<PostAggregator>of(new FieldAccessPostAggregator("x", "idx")))
|
||||||
|
.setLimitSpec(
|
||||||
|
new DefaultLimitSpec(
|
||||||
|
ImmutableList.of(new OrderByColumnSpec("alias", OrderByColumnSpec.Direction.ASCENDING)),
|
||||||
|
100
|
||||||
|
)
|
||||||
|
)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
String json = jsonMapper.writeValueAsString(query);
|
String json = jsonMapper.writeValueAsString(query);
|
||||||
|
|
|
@ -0,0 +1,200 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.query.groupby.orderby;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.metamx.common.guava.Sequence;
|
||||||
|
import com.metamx.common.guava.Sequences;
|
||||||
|
import io.druid.data.input.MapBasedRow;
|
||||||
|
import io.druid.data.input.Row;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.PostAggregator;
|
||||||
|
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||||
|
import io.druid.query.aggregation.post.ConstantPostAggregator;
|
||||||
|
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||||
|
import io.druid.query.dimension.DimensionSpec;
|
||||||
|
import io.druid.segment.TestHelper;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class DefaultLimitSpecTest
|
||||||
|
{
|
||||||
|
private final List<Row> testRowsList;
|
||||||
|
private final Sequence<Row> testRowsSequence;
|
||||||
|
|
||||||
|
public DefaultLimitSpecTest()
|
||||||
|
{
|
||||||
|
testRowsList = ImmutableList.of(
|
||||||
|
createRow("2011-04-01", "k1", 9.0d, "k2", 2L, "k3", 3L),
|
||||||
|
createRow("2011-04-01", "k1", 10.0d, "k2", 1L, "k3", 2L),
|
||||||
|
createRow("2011-04-01", "k1", 20.0d, "k2", 3L, "k3", 1L)
|
||||||
|
);
|
||||||
|
|
||||||
|
testRowsSequence = Sequences.simple(testRowsList);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerde() throws Exception
|
||||||
|
{
|
||||||
|
ObjectMapper mapper = TestHelper.getObjectMapper();
|
||||||
|
|
||||||
|
//defaults
|
||||||
|
String json = "{\"type\": \"default\"}";
|
||||||
|
|
||||||
|
DefaultLimitSpec spec = mapper.readValue(
|
||||||
|
mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)),
|
||||||
|
DefaultLimitSpec.class
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
new DefaultLimitSpec(null, null),
|
||||||
|
spec
|
||||||
|
);
|
||||||
|
|
||||||
|
//non-defaults
|
||||||
|
json = "{\n"
|
||||||
|
+ " \"type\":\"default\",\n"
|
||||||
|
+ " \"columns\":[{\"dimension\":\"d\",\"direction\":\"ASCENDING\"}],\n"
|
||||||
|
+ " \"limit\":10\n"
|
||||||
|
+ "}";
|
||||||
|
|
||||||
|
spec = mapper.readValue(
|
||||||
|
mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)),
|
||||||
|
DefaultLimitSpec.class
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec("d", OrderByColumnSpec.Direction.ASCENDING)), 10),
|
||||||
|
spec
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuildSimple()
|
||||||
|
{
|
||||||
|
DefaultLimitSpec limitSpec = new DefaultLimitSpec(
|
||||||
|
ImmutableList.<OrderByColumnSpec>of(),
|
||||||
|
2
|
||||||
|
);
|
||||||
|
|
||||||
|
Function<Sequence<Row>, Sequence<Row>> limitFn = limitSpec.build(
|
||||||
|
ImmutableList.<DimensionSpec>of(),
|
||||||
|
ImmutableList.<AggregatorFactory>of(),
|
||||||
|
ImmutableList.<PostAggregator>of()
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableList.of(testRowsList.get(0), testRowsList.get(1)),
|
||||||
|
Sequences.toList(limitFn.apply(testRowsSequence), new ArrayList<Row>())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuildWithExplicitOrder()
|
||||||
|
{
|
||||||
|
DefaultLimitSpec limitSpec = new DefaultLimitSpec(
|
||||||
|
ImmutableList.of(
|
||||||
|
new OrderByColumnSpec("k1", OrderByColumnSpec.Direction.ASCENDING)
|
||||||
|
),
|
||||||
|
2
|
||||||
|
);
|
||||||
|
|
||||||
|
Function<Sequence<Row>, Sequence<Row>> limitFn = limitSpec.build(
|
||||||
|
ImmutableList.<DimensionSpec>of(
|
||||||
|
new DefaultDimensionSpec("k1", "k1")
|
||||||
|
),
|
||||||
|
ImmutableList.<AggregatorFactory>of(
|
||||||
|
new LongSumAggregatorFactory("k2", "k2")
|
||||||
|
),
|
||||||
|
ImmutableList.<PostAggregator>of(
|
||||||
|
new ConstantPostAggregator("k3", 1L)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableList.of(testRowsList.get(1), testRowsList.get(2)),
|
||||||
|
Sequences.toList(limitFn.apply(testRowsSequence), new ArrayList<Row>())
|
||||||
|
);
|
||||||
|
|
||||||
|
// if there is an aggregator with same name then that is used to build ordering
|
||||||
|
limitFn = limitSpec.build(
|
||||||
|
ImmutableList.<DimensionSpec>of(
|
||||||
|
new DefaultDimensionSpec("k1", "k1")
|
||||||
|
),
|
||||||
|
ImmutableList.<AggregatorFactory>of(
|
||||||
|
new LongSumAggregatorFactory("k1", "k1")
|
||||||
|
),
|
||||||
|
ImmutableList.<PostAggregator>of(
|
||||||
|
new ConstantPostAggregator("k3", 1L)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableList.of(testRowsList.get(0), testRowsList.get(1)),
|
||||||
|
Sequences.toList(limitFn.apply(testRowsSequence), new ArrayList<Row>())
|
||||||
|
);
|
||||||
|
|
||||||
|
// if there is a post-aggregator with same name then that is used to build ordering
|
||||||
|
limitFn = limitSpec.build(
|
||||||
|
ImmutableList.<DimensionSpec>of(
|
||||||
|
new DefaultDimensionSpec("k1", "k1")
|
||||||
|
),
|
||||||
|
ImmutableList.<AggregatorFactory>of(
|
||||||
|
new LongSumAggregatorFactory("k2", "k2")
|
||||||
|
),
|
||||||
|
ImmutableList.<PostAggregator>of(
|
||||||
|
new ArithmeticPostAggregator(
|
||||||
|
"k1",
|
||||||
|
"+",
|
||||||
|
ImmutableList.<PostAggregator>of(
|
||||||
|
new ConstantPostAggregator("x", 1),
|
||||||
|
new ConstantPostAggregator("y", 1))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableList.of(testRowsList.get(0), testRowsList.get(1)),
|
||||||
|
Sequences.toList(limitFn.apply(testRowsSequence), new ArrayList<Row>())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Row createRow(String timestamp, Object... vals)
|
||||||
|
{
|
||||||
|
Preconditions.checkArgument(vals.length % 2 == 0);
|
||||||
|
|
||||||
|
Map<String, Object> theVals = Maps.newHashMap();
|
||||||
|
for (int i = 0; i < vals.length; i += 2) {
|
||||||
|
theVals.put(vals[i].toString(), vals[i + 1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
DateTime ts = new DateTime(timestamp);
|
||||||
|
return new MapBasedRow(ts, theVals);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue