diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 9ca527ea17a..70403e7d0ce 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -136,8 +136,13 @@ public class GroupByQuery extends BaseQuery private Function, Sequence> makePostProcessingFn() { - Function, Sequence> postProcessingFn = - limitSpec.build(dimensions, aggregatorSpecs, postAggregatorSpecs); + Function, Sequence> postProcessingFn = limitSpec.build( + dimensions, + aggregatorSpecs, + postAggregatorSpecs, + getGranularity(), + getContextSortByDimsFirst() + ); if (havingSpec != null) { postProcessingFn = Functions.compose( diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java index d44f51c2ebc..2cbaf81bfe9 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -32,6 +32,8 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.aggregation.AggregatorFactory; @@ -119,16 +121,14 @@ public class DefaultLimitSpec implements LimitSpec public Function, Sequence> build( List dimensions, List aggs, - List postAggs + List postAggs, + Granularity granularity, + boolean sortByDimsFirst ) { // Can avoid re-sorting if the natural ordering is good enough. - boolean sortingNeeded = false; - - if (dimensions.size() < columns.size()) { - sortingNeeded = true; - } + boolean sortingNeeded = dimensions.size() < columns.size(); final Set aggAndPostAggNames = Sets.newHashSet(); for (AggregatorFactory agg : aggs) { @@ -167,12 +167,17 @@ public class DefaultLimitSpec implements LimitSpec } } + if (!sortingNeeded) { + // If granularity is ALL, sortByDimsFirst doesn't change the sorting order. + sortingNeeded = !granularity.equals(Granularities.ALL) && sortByDimsFirst; + } + if (!sortingNeeded) { return isLimited() ? new LimitingFn(limit) : Functions.identity(); } // Materialize the Comparator first for fast-fail error checking. - final Ordering ordering = makeComparator(dimensions, aggs, postAggs); + final Ordering ordering = makeComparator(dimensions, aggs, postAggs, sortByDimsFirst); if (isLimited()) { return new TopNFunction(ordering, limit); @@ -199,10 +204,13 @@ public class DefaultLimitSpec implements LimitSpec } private Ordering makeComparator( - List dimensions, List aggs, List postAggs + List dimensions, + List aggs, + List postAggs, + boolean sortByDimsFirst ) { - Ordering ordering = new Ordering() + Ordering timeOrdering = new Ordering() { @Override public int compare(Row left, Row right) @@ -226,6 +234,7 @@ public class DefaultLimitSpec implements LimitSpec postAggregatorsMap.put(postAgg.getName(), postAgg); } + Ordering ordering = null; for (OrderByColumnSpec columnSpec : columns) { String columnName = columnSpec.getDimension(); Ordering nextOrdering = null; @@ -246,7 +255,13 @@ public class DefaultLimitSpec implements LimitSpec nextOrdering = nextOrdering.reverse(); } - ordering = ordering.compound(nextOrdering); + ordering = ordering == null ? nextOrdering : ordering.compound(nextOrdering); + } + + if (ordering != null) { + ordering = sortByDimsFirst ? ordering.compound(timeOrdering) : timeOrdering.compound(ordering); + } else { + ordering = timeOrdering; } return ordering; diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java index 4638e6a49dc..2e0e489ce1e 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Function; import io.druid.data.input.Row; import io.druid.java.util.common.Cacheable; +import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; @@ -48,16 +49,20 @@ public interface LimitSpec extends Cacheable /** * Returns a function that applies a limit to an input sequence that is assumed to be sorted on dimensions. * - * @param dimensions query dimensions - * @param aggs query aggregators - * @param postAggs query postAggregators + * @param dimensions query dimensions + * @param aggs query aggregators + * @param postAggs query postAggregators + * @param granularity query granularity + * @param sortByDimsFirst 'sortByDimsFirst' value in queryContext * * @return limit function */ Function, Sequence> build( List dimensions, List aggs, - List postAggs + List postAggs, + Granularity granularity, + boolean sortByDimsFirst ); LimitSpec merge(LimitSpec other); diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java index 2ba458a6755..835c1d0a006 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.base.Function; import com.google.common.base.Functions; import io.druid.data.input.Row; +import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; @@ -52,7 +53,9 @@ public final class NoopLimitSpec implements LimitSpec public Function, Sequence> build( List dimensions, List aggs, - List postAggs + List postAggs, + Granularity granularity, + boolean sortByDimsFirst ) { return Functions.identity(); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 5650024503d..0d0615393b7 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -3531,6 +3531,68 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testGroupByWithLookupAndLimitAndSortByDimsFirst() + { + Map map = new HashMap<>(); + map.put("automotive", "9"); + map.put("business", "8"); + map.put("entertainment", "7"); + map.put("health", "6"); + map.put("mezzanine", "5"); + map.put("news", "4"); + map.put("premium", "3"); + map.put("technology", "2"); + map.put("travel", "1"); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions( + Lists.newArrayList( + new ExtractionDimensionSpec( + "quality", + "alias", + new LookupExtractionFn(new MapLookupExtractor(map, false), false, null, false, false) + ) + ) + ) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setLimitSpec(new DefaultLimitSpec(Lists.newArrayList( + new OrderByColumnSpec("alias", null, StringComparators.ALPHANUMERIC)), 11)) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setContext(ImmutableMap.of("sortByDimsFirst", true)) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "1", "rows", 1L, "idx", 119L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "1", "rows", 1L, "idx", 126L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "2", "rows", 1L, "idx", 78L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "2", "rows", 1L, "idx", 97L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "3", "rows", 3L, "idx", 2900L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "3", "rows", 3L, "idx", 2505L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "4", "rows", 1L, "idx", 121L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "4", "rows", 1L, "idx", 114L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "5", "rows", 3L, "idx", 2870L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "5", "rows", 3L, "idx", 2447L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "6", "rows", 1L, "idx", 120L) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + @Ignore @Test // This is a test to verify per limit groupings, but Druid currently does not support this functionality. At a point @@ -7436,7 +7498,9 @@ public class GroupByQueryRunnerTest query.getLimitSpec().build( query.getDimensions(), query.getAggregatorSpecs(), - query.getPostAggregatorSpecs() + query.getPostAggregatorSpecs(), + query.getGranularity(), + query.getContextSortByDimsFirst() ) ); @@ -7497,7 +7561,9 @@ public class GroupByQueryRunnerTest query.getLimitSpec().build( query.getDimensions(), query.getAggregatorSpecs(), - query.getPostAggregatorSpecs() + query.getPostAggregatorSpecs(), + query.getGranularity(), + query.getContextSortByDimsFirst() ) ); @@ -7700,7 +7766,9 @@ public class GroupByQueryRunnerTest query.getLimitSpec().build( query.getDimensions(), query.getAggregatorSpecs(), - query.getPostAggregatorSpecs() + query.getPostAggregatorSpecs(), + query.getGranularity(), + query.getContextSortByDimsFirst() ) ); @@ -7762,7 +7830,9 @@ public class GroupByQueryRunnerTest query.getLimitSpec().build( query.getDimensions(), query.getAggregatorSpecs(), - query.getPostAggregatorSpecs() + query.getPostAggregatorSpecs(), + query.getGranularity(), + query.getContextSortByDimsFirst() ) ); @@ -7823,7 +7893,9 @@ public class GroupByQueryRunnerTest query.getLimitSpec().build( query.getDimensions(), query.getAggregatorSpecs(), - query.getPostAggregatorSpecs() + query.getPostAggregatorSpecs(), + query.getGranularity(), + query.getContextSortByDimsFirst() ) ); diff --git a/processing/src/test/java/io/druid/query/groupby/orderby/DefaultLimitSpecTest.java b/processing/src/test/java/io/druid/query/groupby/orderby/DefaultLimitSpecTest.java index bb00b1fc038..64d354e99fd 100644 --- a/processing/src/test/java/io/druid/query/groupby/orderby/DefaultLimitSpecTest.java +++ b/processing/src/test/java/io/druid/query/groupby/orderby/DefaultLimitSpecTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.Maps; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.aggregation.AggregatorFactory; @@ -40,6 +41,7 @@ import io.druid.query.dimension.DimensionSpec; import io.druid.query.expression.TestExprMacroTable; import io.druid.query.ordering.StringComparators; import io.druid.segment.TestHelper; +import io.druid.segment.column.ValueType; import org.junit.Assert; import org.junit.Test; @@ -159,7 +161,9 @@ public class DefaultLimitSpecTest Function, Sequence> limitFn = limitSpec.build( ImmutableList.of(), ImmutableList.of(), - ImmutableList.of() + ImmutableList.of(), + Granularities.NONE, + false ); Assert.assertEquals( @@ -168,6 +172,50 @@ public class DefaultLimitSpecTest ); } + @Test + public void testWithAllGranularity() + { + DefaultLimitSpec limitSpec = new DefaultLimitSpec( + ImmutableList.of(new OrderByColumnSpec("k1", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC)), + 2 + ); + + Function, Sequence> limitFn = limitSpec.build( + ImmutableList.of(new DefaultDimensionSpec("k1", "k1", ValueType.DOUBLE)), + ImmutableList.of(), + ImmutableList.of(), + Granularities.ALL, + true + ); + + Assert.assertEquals( + ImmutableList.of(testRowsList.get(0), testRowsList.get(1)), + limitFn.apply(testRowsSequence).toList() + ); + } + + @Test + public void testWithSortByDimsFirst() + { + DefaultLimitSpec limitSpec = new DefaultLimitSpec( + ImmutableList.of(new OrderByColumnSpec("k1", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC)), + 2 + ); + + Function, Sequence> limitFn = limitSpec.build( + ImmutableList.of(new DefaultDimensionSpec("k1", "k1", ValueType.DOUBLE)), + ImmutableList.of(), + ImmutableList.of(), + Granularities.NONE, + true + ); + + Assert.assertEquals( + ImmutableList.of(testRowsList.get(2), testRowsList.get(0)), + limitFn.apply(testRowsSequence).toList() + ); + } + @Test public void testSortDimensionDescending() { @@ -179,7 +227,9 @@ public class DefaultLimitSpecTest Function, Sequence> limitFn = limitSpec.build( ImmutableList.of(new DefaultDimensionSpec("k1", "k1")), ImmutableList.of(), - ImmutableList.of() + ImmutableList.of(), + Granularities.NONE, + false ); // Note: This test encodes the fact that limitSpec sorts numbers like strings; we might want to change this @@ -209,7 +259,9 @@ public class DefaultLimitSpecTest ), ImmutableList.of( new ConstantPostAggregator("k3", 1L) - ) + ), + Granularities.NONE, + false ); Assert.assertEquals( ImmutableList.of(testRowsList.get(0), testRowsList.get(1)), @@ -226,7 +278,9 @@ public class DefaultLimitSpecTest ), ImmutableList.of( new ConstantPostAggregator("k3", 1L) - ) + ), + Granularities.NONE, + false ); Assert.assertEquals( ImmutableList.of(testRowsList.get(2), testRowsList.get(0)), @@ -249,7 +303,9 @@ public class DefaultLimitSpecTest new ConstantPostAggregator("x", 1), new ConstantPostAggregator("y", 1)) ) - ) + ), + Granularities.NONE, + false ); Assert.assertEquals( (List) ImmutableList.of(testRowsList.get(2), testRowsList.get(0)), @@ -260,7 +316,9 @@ public class DefaultLimitSpecTest limitFn = limitSpec.build( ImmutableList.of(new DefaultDimensionSpec("k1", "k1")), ImmutableList.of(new LongSumAggregatorFactory("k2", "k2")), - ImmutableList.of(new ExpressionPostAggregator("k1", "1 + 1", null, TestExprMacroTable.INSTANCE)) + ImmutableList.of(new ExpressionPostAggregator("k1", "1 + 1", null, TestExprMacroTable.INSTANCE)), + Granularities.NONE, + false ); Assert.assertEquals( (List) ImmutableList.of(testRowsList.get(2), testRowsList.get(0)),