diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml index d0b7ee2de31..973aa7a22a3 100644 --- a/codestyle/spotbugs-exclude.xml +++ b/codestyle/spotbugs-exclude.xml @@ -28,6 +28,16 @@ Reference: https://github.com/apache/druid/pull/7894/files --> + + + + + + + + + + diff --git a/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java b/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java new file mode 100644 index 00000000000..318751c3003 --- /dev/null +++ b/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.collections; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Ordering; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Objects; + +/** + * Simultaneously sorts and limits its input. + * + * The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered. + * + * Not thread-safe. + * + * Note: this class doesn't have its own unit tests. It is tested along with + * {@link org.apache.druid.java.util.common.guava.TopNSequence} in "TopNSequenceTest". + */ +public class StableLimitingSorter +{ + private final MinMaxPriorityQueue> queue; + + private long count = 0; + + public StableLimitingSorter(final Comparator comparator, final int limit) + { + this.queue = MinMaxPriorityQueue + .orderedBy( + Ordering.from( + Comparator., T>comparing(NumberedElement::getElement, comparator) + .thenComparing(NumberedElement::getNumber) + ) + ) + .maximumSize(limit) + .create(); + } + + /** + * Offer an element to the sorter. + */ + public void add(T element) + { + queue.offer(new NumberedElement<>(element, count++)); + } + + /** + * Drain elements in sorted order (least first). + */ + public Iterator drain() + { + return new Iterator() + { + @Override + public boolean hasNext() + { + return !queue.isEmpty(); + } + + @Override + public T next() + { + return queue.poll().getElement(); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + @VisibleForTesting + static class NumberedElement + { + private final T element; + private final long number; + + public NumberedElement(T element, long number) + { + this.element = element; + this.number = number; + } + + public T getElement() + { + return element; + } + + public long getNumber() + { + return number; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NumberedElement that = (NumberedElement) o; + return number == that.number && + Objects.equals(element, that.element); + } + + @Override + public int hashCode() + { + return Objects.hash(element, number); + } + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java index c17a638b183..13f612f4243 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java @@ -19,6 +19,7 @@ package org.apache.druid.java.util.common.guava; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Ordering; @@ -85,8 +86,21 @@ public interface Sequence return accumulate(new ArrayList<>(), Accumulators.list()); } + default Sequence skip(long skip) + { + Preconditions.checkArgument(skip >= 0, "skip >= 0"); + + if (skip >= 1) { + return new SkippingSequence<>(this, skip); + } else { + return this; + } + } + default Sequence limit(long limit) { + Preconditions.checkArgument(limit >= 0, "limit >= 0"); + return new LimitedSequence<>(this, limit); } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java index df6fbe5cbd9..2a29db5e6c0 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.concurrent.Executor; /** + * */ public class Sequences { @@ -131,10 +132,18 @@ public class Sequences }; } - // This will materialize the entire sequence in memory. Use at your own risk. + /** + * Returns a sorted copy of the provided sequence. + * + * This will materialize the entire sequence in memory. Use at your own risk. + * + * The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered. + */ public static Sequence sort(final Sequence sequence, final Comparator comparator) { List seqList = sequence.toList(); + + // Note: Collections.sort is guaranteed to be stable. Collections.sort(seqList, comparator); return simple(seqList); } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/SkippingSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/SkippingSequence.java new file mode 100644 index 00000000000..4eba7a5473f --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/SkippingSequence.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.guava; + +import org.apache.druid.java.util.common.IAE; + +import java.io.IOException; + +/** + * A Sequence that skips the first few elements. + */ +public class SkippingSequence extends YieldingSequenceBase +{ + private final Sequence baseSequence; + private final long skip; + + public SkippingSequence(Sequence baseSequence, long skip) + { + this.baseSequence = baseSequence; + this.skip = skip; + + if (skip < 1) { + throw new IAE("'skip' must be greater than zero"); + } + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + final SkippingYieldingAccumulator skippingAccumulator = new SkippingYieldingAccumulator<>(accumulator); + return wrapYielder(baseSequence.toYielder(initValue, skippingAccumulator), skippingAccumulator); + } + + private Yielder wrapYielder( + final Yielder yielder, + final SkippingYieldingAccumulator accumulator + ) + { + return new Yielder() + { + @Override + public OutType get() + { + return yielder.get(); + } + + @Override + public Yielder next(OutType initValue) + { + return wrapYielder(yielder.next(initValue), accumulator); + } + + @Override + public boolean isDone() + { + return yielder.isDone(); + } + + @Override + public void close() throws IOException + { + yielder.close(); + } + }; + } + + private class SkippingYieldingAccumulator extends DelegatingYieldingAccumulator + { + private long skipped = 0; + + public SkippingYieldingAccumulator(final YieldingAccumulator accumulator) + { + super(accumulator); + } + + @Override + public OutType accumulate(OutType accumulated, T in) + { + if (skipped < skip) { + skipped++; + return accumulated; + } else { + return super.accumulate(accumulated, in); + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/TopNSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/TopNSequence.java similarity index 53% rename from processing/src/main/java/org/apache/druid/query/groupby/orderby/TopNSequence.java rename to core/src/main/java/org/apache/druid/java/util/common/guava/TopNSequence.java index bada6d5eea4..0b37c0022e5 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/TopNSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/TopNSequence.java @@ -17,22 +17,24 @@ * under the License. */ -package org.apache.druid.query.groupby.orderby; +package org.apache.druid.java.util.common.guava; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Ordering; -import org.apache.druid.java.util.common.guava.Accumulator; -import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.collections.StableLimitingSorter; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +/** + * Simultaneously sorts and limits its input. + * + * The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered. + */ public class TopNSequence extends BaseSequence> { public TopNSequence( final Sequence input, - final Ordering ordering, + final Comparator ordering, final int limit ) { @@ -47,45 +49,18 @@ public class TopNSequence extends BaseSequence> } // Materialize the topN values - final MinMaxPriorityQueue queue = MinMaxPriorityQueue - .orderedBy(ordering) - .maximumSize(limit) - .create(); + final StableLimitingSorter sorter = new StableLimitingSorter<>(ordering, limit); input.accumulate( - queue, - new Accumulator, T>() - { - @Override - public MinMaxPriorityQueue accumulate(MinMaxPriorityQueue theQueue, T row) - { - theQueue.offer(row); - return theQueue; - } + sorter, + (theSorter, element) -> { + theSorter.add(element); + return theSorter; } ); // Now return them when asked - return new Iterator() - { - @Override - public boolean hasNext() - { - return !queue.isEmpty(); - } - - @Override - public T next() - { - return queue.poll(); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; + return sorter.drain(); } @Override diff --git a/core/src/test/java/org/apache/druid/collections/StableLimitingSorterTest.java b/core/src/test/java/org/apache/druid/collections/StableLimitingSorterTest.java new file mode 100644 index 00000000000..6007bc5089c --- /dev/null +++ b/core/src/test/java/org/apache/druid/collections/StableLimitingSorterTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.collections; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class StableLimitingSorterTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(StableLimitingSorter.NumberedElement.class).usingGetClass().verify(); + } +} diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/SkippingSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/SkippingSequenceTest.java new file mode 100644 index 00000000000..0253794fd03 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/guava/SkippingSequenceTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.guava; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +public class SkippingSequenceTest +{ + private static final List NUMS = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + @Test + public void testSanityAccumulate() throws Exception + { + final int threshold = 5; + SequenceTestHelper.testAll( + Sequences.simple(NUMS).skip(threshold), + Lists.newArrayList(Iterables.skip(NUMS, threshold)) + ); + } + + @Test + public void testTwo() throws Exception + { + final int threshold = 2; + SequenceTestHelper.testAll( + Sequences.simple(NUMS).skip(threshold), + Lists.newArrayList(Iterables.skip(NUMS, threshold)) + ); + } + + @Test + public void testOne() throws Exception + { + final int threshold = 1; + SequenceTestHelper.testAll( + Sequences.simple(NUMS).skip(threshold), + Lists.newArrayList(Iterables.skip(NUMS, threshold)) + ); + } + + @Test + public void testLimitThenSkip() throws Exception + { + final int skip = 2; + final int limit = 4; + SequenceTestHelper.testAll( + Sequences.simple(NUMS).limit(limit).skip(skip), + Lists.newArrayList(Iterables.skip(Iterables.limit(NUMS, limit), skip)) + ); + } + + @Test + public void testWithYieldingSequence() + { + // Create a Sequence whose Yielders will yield for each element, regardless of what the accumulator passed + // to "toYielder" does. + final BaseSequence> sequence = new BaseSequence>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return NUMS.iterator(); + } + + @Override + public void cleanup(Iterator iterFromMake) + { + // Do nothing. + } + } + ) + { + @Override + public Yielder toYielder( + final OutType initValue, + final YieldingAccumulator accumulator + ) + { + return super.toYielder( + initValue, + new DelegatingYieldingAccumulator(accumulator) + { + @Override + public OutType accumulate(OutType accumulated, Integer in) + { + final OutType retVal = super.accumulate(accumulated, in); + yield(); + return retVal; + } + } + ); + } + }; + + final int threshold = 4; + + // Can't use "testAll" because its "testYield" implementation depends on the underlying Sequence _not_ yielding. + SequenceTestHelper.testAccumulation( + "", + sequence.skip(threshold), + Lists.newArrayList(Iterables.skip(NUMS, threshold)) + ); + } + + @Test + public void testNoSideEffects() + { + final List nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + final AtomicLong accumulated = new AtomicLong(0); + final Sequence seq = Sequences.simple( + Iterables.transform( + nums, + input -> { + accumulated.addAndGet(input); + return input; + } + ) + ).limit(5); + + Assert.assertEquals(10, seq.accumulate(0, new SkippingSequenceTest.IntAdditionAccumulator()).intValue()); + Assert.assertEquals(10, accumulated.get()); + Assert.assertEquals(10, seq.accumulate(0, new SkippingSequenceTest.IntAdditionAccumulator()).intValue()); + Assert.assertEquals(20, accumulated.get()); + } + + private static class IntAdditionAccumulator implements Accumulator + { + @Override + public Integer accumulate(Integer accumulated, Integer in) + { + return accumulated + in; + } + } +} diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/TopNSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/TopNSequenceTest.java new file mode 100644 index 00000000000..abf11ad51f8 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/guava/TopNSequenceTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.guava; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Random; + + +@RunWith(Enclosed.class) +public class TopNSequenceTest +{ + private static final List EMPTY = Collections.emptyList(); + private static final List SINGLE = Collections.singletonList("a"); + private static final List RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk")); + private static final List RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba")); + + @RunWith(Parameterized.class) + public static class TopNSequenceAscDescTest + { + private static final Ordering ASC = Ordering.natural(); + private static final Ordering DESC = Ordering.natural().reverse(); + + private Ordering ordering; + private List rawInput; + private int limit; + + @Parameterized.Parameters(name = "comparator={0}, rawInput={1}, limit={2}") + public static Collection makeTestData() + { + Object[][] data = new Object[][]{ + {ASC, RAW_ASC, RAW_ASC.size() - 2}, + {ASC, RAW_ASC, RAW_ASC.size()}, + {ASC, RAW_ASC, RAW_ASC.size() + 2}, + {ASC, RAW_ASC, 0}, + {ASC, SINGLE, 0}, + {ASC, SINGLE, 1}, + {ASC, SINGLE, 2}, + {ASC, SINGLE, 3}, + {ASC, EMPTY, 0}, + {ASC, EMPTY, 1}, + {DESC, RAW_DESC, RAW_DESC.size() - 2}, + {DESC, RAW_DESC, RAW_DESC.size()}, + {DESC, RAW_DESC, RAW_DESC.size() + 2}, + {DESC, RAW_DESC, 0}, + {DESC, RAW_DESC, 0}, + {DESC, SINGLE, 1}, + {DESC, SINGLE, 2}, + {DESC, SINGLE, 3}, + {DESC, EMPTY, 0}, + {DESC, EMPTY, 1} + }; + + return Arrays.asList(data); + } + + public TopNSequenceAscDescTest(Ordering ordering, List rawInput, int limit) + { + this.ordering = ordering; + this.rawInput = rawInput; + this.limit = limit; + } + + @Test + public void testOrderByWithLimit() + { + List expected = rawInput.subList(0, Math.min(limit, rawInput.size())); + List inputs = Lists.newArrayList(rawInput); + Collections.shuffle(inputs, new Random(2)); + + Sequence result = new TopNSequence<>(Sequences.simple(inputs), ordering, limit); + + Assert.assertEquals(expected, result.toList()); + } + } + + /** + * This class has test cases using a comparator that sometimes returns zero for unequal things. + */ + @RunWith(Parameterized.class) + public static class TopNSequenceEvenOddTest + { + // 'a', 'c', 'e', ... all come before 'b', 'd', 'f', ... + private static final Ordering EVENODD = Ordering.from(Comparator.comparing(s -> 1 - s.charAt(0) % 2)); + + private String expected; + private List rawInput; + + @Parameterized.Parameters(name = "rawInput={1}") + public static Collection makeTestData() + { + Object[][] data = new Object[][]{ + {"acegikbdfhj", RAW_ASC}, + {"kigecajhfdb", RAW_DESC} + }; + + return Arrays.asList(data); + } + + public TopNSequenceEvenOddTest(String expected, List rawInput) + { + this.expected = expected; + this.rawInput = rawInput; + } + + @Test + public void testStability() + { + // Verify that the output of the sequence is stable relative to the input. + for (int limit = 0; limit < expected.length() + 1; limit++) { + final TopNSequence sequence = new TopNSequence<>(Sequences.simple(rawInput), EVENODD, limit); + Assert.assertEquals( + "limit = " + limit, + expected.substring(0, Math.min(limit, expected.length())), + Joiner.on("").join(sequence.toList()) + ); + } + } + } +} diff --git a/docs/querying/limitspec.md b/docs/querying/limitspec.md index 53cb86ae07a..086ac3bda9f 100644 --- a/docs/querying/limitspec.md +++ b/docs/querying/limitspec.md @@ -35,11 +35,24 @@ The default limit spec takes a limit and the list of columns to do an orderBy op ```json { "type" : "default", - "limit" : , - "columns" : [list of OrderByColumnSpec], + "limit" : , + "offset" : , + "columns" : [], } ``` +The "limit" parameter is the maximum number of rows to return. + +The "offset" parameter tells Druid to skip this many rows when returning results. If both "limit" and "offset" are +provided, then "offset" will be applied first, followed by "limit". For example, a spec with limit 100 and offset 10 +will return 100 rows starting from row number 10. Internally, the query is executed by extending the limit by the offset +and then discarding a number of rows equal to the offset. This means that raising the offset will increase resource +usage by an amount similar to increasing the limit. + +Together, "limit" and "offset" can be used to implement pagination. However, note that if the underlying datasource is +modified in between page fetches in ways that affect overall query results, then the different pages will not +necessarily align with each other. + #### OrderByColumnSpec OrderByColumnSpecs indicate how to do order by operations. Each order-by condition can be a `jsonString` or a map of the following form: diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index 54388b53cfe..decf8fb4d80 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -132,7 +132,7 @@ public class GroupByQuery extends BaseQuery @JsonProperty("aggregations") List aggregatorSpecs, @JsonProperty("postAggregations") List postAggregatorSpecs, @JsonProperty("having") @Nullable HavingSpec havingSpec, - @JsonProperty("limitSpec") LimitSpec limitSpec, + @JsonProperty("limitSpec") @Nullable LimitSpec limitSpec, @JsonProperty("subtotalsSpec") @Nullable List> subtotalsSpec, @JsonProperty("context") Map context ) @@ -183,7 +183,7 @@ public class GroupByQuery extends BaseQuery final @Nullable List aggregatorSpecs, final @Nullable List postAggregatorSpecs, final @Nullable HavingSpec havingSpec, - final LimitSpec limitSpec, + final @Nullable LimitSpec limitSpec, final @Nullable List> subtotalsSpec, final @Nullable Function, Sequence> postProcessingFn, final Map context @@ -483,10 +483,10 @@ public class GroupByQuery extends BaseQuery final boolean forceLimitPushDown = validateAndGetForceLimitPushDown(); if (limitSpec instanceof DefaultLimitSpec) { - DefaultLimitSpec defaultLimitSpec = (DefaultLimitSpec) limitSpec; + DefaultLimitSpec limitSpecWithoutOffset = ((DefaultLimitSpec) limitSpec).withOffsetToLimit(); // If only applying an orderby without a limit, don't try to push down - if (!defaultLimitSpec.isLimited()) { + if (!limitSpecWithoutOffset.isLimited()) { return false; } @@ -1153,7 +1153,7 @@ public class GroupByQuery extends BaseQuery if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) { theLimitSpec = NoopLimitSpec.instance(); } else { - theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit); + theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, 0, limit); } } else { theLimitSpec = limitSpec; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 85ad1dd4511..d3aaa4fd3ee 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -621,6 +621,9 @@ public class GroupByQueryEngineV2 } if (canDoLimitPushdown) { + // Sanity check; must not have "offset" at this point. + Preconditions.checkState(!limitSpec.isOffset(), "Cannot push down offsets"); + LimitedBufferHashGrouper limitGrouper = new LimitedBufferHashGrouper<>( Suppliers.ofInstance(buffer), keySerde, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 9cf29a400e2..e54a1877aac 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Iterators; import net.jpountz.lz4.LZ4BlockInputStream; @@ -98,6 +99,9 @@ public class SpillingGrouper implements Grouper this.keyObjComparator = keySerdeFactory.objectComparator(false); this.defaultOrderKeyObjComparator = keySerdeFactory.objectComparator(true); if (limitSpec != null) { + // Sanity check; must not have "offset" at this point. + Preconditions.checkState(!limitSpec.isOffset(), "Cannot push down offsets"); + LimitedBufferHashGrouper limitGrouper = new LimitedBufferHashGrouper<>( bufferSupplier, keySerde, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java index 20f3e8c42e6..56ad4b85c7a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -20,14 +20,15 @@ package org.apache.druid.query.groupby.orderby; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; -import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Rows; @@ -35,6 +36,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.TopNSequence; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.dimension.DimensionSpec; @@ -47,11 +49,14 @@ import org.apache.druid.segment.column.ValueType; import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -63,8 +68,14 @@ public class DefaultLimitSpec implements LimitSpec private static final byte CACHE_KEY = 0x1; private final List columns; + private final int offset; private final int limit; + public static Builder builder() + { + return new Builder(); + } + /** * Check if a limitSpec has columns in the sorting order that are not part of the grouping fields represented * by `dimensions`. @@ -98,13 +109,28 @@ public class DefaultLimitSpec implements LimitSpec @JsonCreator public DefaultLimitSpec( @JsonProperty("columns") List columns, + @JsonProperty("offset") Integer offset, @JsonProperty("limit") Integer limit ) { this.columns = (columns == null) ? ImmutableList.of() : columns; + this.offset = (offset == null) ? 0 : offset; this.limit = (limit == null) ? Integer.MAX_VALUE : limit; - Preconditions.checkArgument(this.limit > 0, "limit[%s] must be >0", limit); + Preconditions.checkArgument(this.offset >= 0, "offset[%s] must be >= 0", this.offset); + Preconditions.checkArgument(this.limit > 0, "limit[%s] must be > 0", this.limit); + } + + /** + * Constructor that does not accept "offset". Useful for tests that only want to provide "columns" and "limit". + */ + @VisibleForTesting + public DefaultLimitSpec( + final List columns, + final Integer limit + ) + { + this(columns, 0, limit); } @JsonProperty @@ -113,12 +139,32 @@ public class DefaultLimitSpec implements LimitSpec return columns; } + /** + * Offset for this query; behaves like SQL "OFFSET". Zero means no offset. Negative values are invalid. + */ @JsonProperty + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public int getOffset() + { + return offset; + } + + /** + * Offset for this query; behaves like SQL "LIMIT". Will always be positive. {@link Integer#MAX_VALUE} is used in + * situations where the user wants an effectively unlimited resultset. + */ + @JsonProperty + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = LimitJsonIncludeFilter.class) public int getLimit() { return limit; } + public boolean isOffset() + { + return offset > 0; + } + public boolean isLimited() { return limit < Integer.MAX_VALUE; @@ -174,24 +220,39 @@ public class DefaultLimitSpec implements LimitSpec sortingNeeded = !query.getGranularity().equals(Granularities.ALL) && query.getContextSortByDimsFirst(); } - if (!sortingNeeded) { - return isLimited() ? new LimitingFn(limit) : Functions.identity(); + final Function, Sequence> sortAndLimitFn; + + if (sortingNeeded) { + // Materialize the Comparator first for fast-fail error checking. + final Ordering ordering = makeComparator( + query.getResultRowSignature(), + query.getResultRowHasTimestamp(), + query.getDimensions(), + query.getAggregatorSpecs(), + query.getPostAggregatorSpecs(), + query.getContextSortByDimsFirst() + ); + + // Both branches use a stable sort; important so consistent results are returned from query to query if the + // underlying data isn't changing. (Useful for query reproducibility and offset-based pagination.) + if (isLimited()) { + sortAndLimitFn = results -> new TopNSequence<>(results, ordering, limit + offset); + } else { + sortAndLimitFn = results -> Sequences.sort(results, ordering).limit(limit + offset); + } + } else { + if (isLimited()) { + sortAndLimitFn = results -> results.limit(limit + offset); + } else { + sortAndLimitFn = Functions.identity(); + } } - // Materialize the Comparator first for fast-fail error checking. - final Ordering ordering = makeComparator( - query.getResultRowSignature(), - query.getResultRowHasTimestamp(), - query.getDimensions(), - query.getAggregatorSpecs(), - query.getPostAggregatorSpecs(), - query.getContextSortByDimsFirst() - ); - - if (isLimited()) { - return new TopNFunction(ordering, limit); + // Finally, apply offset after sorting and limiting. + if (isOffset()) { + return results -> sortAndLimitFn.apply(results).skip(offset); } else { - return new SortingFn(ordering); + return sortAndLimitFn; } } @@ -217,10 +278,38 @@ public class DefaultLimitSpec implements LimitSpec { return new DefaultLimitSpec( columns.stream().filter(c -> names.contains(c.getDimension())).collect(Collectors.toList()), + offset, limit ); } + /** + * Returns a new DefaultLimitSpec identical to this one except for one difference: an offset parameter, if any, will + * be removed and added to the limit. This is designed for passing down queries to lower levels of the stack. Only + * the highest level should apply the offset parameter, and any pushed-down limits must be increased to accommodate + * the offset. + */ + public DefaultLimitSpec withOffsetToLimit() + { + if (isOffset()) { + final int newLimit; + + if (limit == Integer.MAX_VALUE) { + // Unlimited stays unlimited. + newLimit = Integer.MAX_VALUE; + } else if (limit > Integer.MAX_VALUE - offset) { + // Handle overflow as best we can. + throw new ISE("Cannot apply limit[%d] with offset[%d] due to overflow", limit, offset); + } else { + newLimit = limit + offset; + } + + return new DefaultLimitSpec(columns, 0, newLimit); + } else { + return this; + } + } + private Ordering makeComparator( RowSignature rowSignature, boolean hasTimestamp, @@ -331,60 +420,11 @@ public class DefaultLimitSpec implements LimitSpec { return "DefaultLimitSpec{" + "columns='" + columns + '\'' + + ", offset=" + offset + ", limit=" + limit + '}'; } - private static class LimitingFn implements Function, Sequence> - { - private final int limit; - - public LimitingFn(int limit) - { - this.limit = limit; - } - - @Override - public Sequence apply(Sequence input) - { - return input.limit(limit); - } - } - - private static class SortingFn implements Function, Sequence> - { - private final Ordering ordering; - - public SortingFn(Ordering ordering) - { - this.ordering = ordering; - } - - @Override - public Sequence apply(@Nullable Sequence input) - { - return Sequences.sort(input, ordering); - } - } - - private static class TopNFunction implements Function, Sequence> - { - private final Ordering ordering; - private final int limit; - - public TopNFunction(Ordering ordering, int limit) - { - this.ordering = ordering; - this.limit = limit; - } - - @Override - public Sequence apply(final Sequence input) - { - return new TopNSequence<>(input, ordering, limit); - } - } - @Override public boolean equals(Object o) { @@ -394,25 +434,16 @@ public class DefaultLimitSpec implements LimitSpec if (o == null || getClass() != o.getClass()) { return false; } - DefaultLimitSpec that = (DefaultLimitSpec) o; - - if (limit != that.limit) { - return false; - } - if (columns != null ? !columns.equals(that.columns) : that.columns != null) { - return false; - } - - return true; + return offset == that.offset && + limit == that.limit && + Objects.equals(columns, that.columns); } @Override public int hashCode() { - int result = columns != null ? columns.hashCode() : 0; - result = 31 * result + limit; - return result; + return Objects.hash(columns, offset, limit); } @Override @@ -427,12 +458,83 @@ public class DefaultLimitSpec implements LimitSpec ++index; } - ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 4) + ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 2 * Integer.BYTES) .put(CACHE_KEY); for (byte[] columnByte : columnBytes) { buffer.put(columnByte); } - buffer.put(Ints.toByteArray(limit)); + + buffer.putInt(limit); + buffer.putInt(offset); + return buffer.array(); } + + public static class Builder + { + private List columns = Collections.emptyList(); + private Integer offset = null; + private Integer limit = null; + + private Builder() + { + } + + public Builder orderBy(final String... columns) + { + return orderBy( + Arrays.stream(columns) + .map(s -> new OrderByColumnSpec(s, OrderByColumnSpec.Direction.ASCENDING)) + .toArray(OrderByColumnSpec[]::new) + ); + } + + + public Builder orderBy(final OrderByColumnSpec... columns) + { + this.columns = ImmutableList.copyOf(Arrays.asList(columns)); + return this; + } + + public Builder offset(final int offset) + { + this.offset = offset; + return this; + } + + public Builder limit(final int limit) + { + this.limit = limit; + return this; + } + + public DefaultLimitSpec build() + { + return new DefaultLimitSpec(columns, offset, limit); + } + } + + /** + * {@link JsonInclude} filter for {@link #getLimit()}. + * + * This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs + * exclusions (see spotbugs-exclude.xml). + */ + @SuppressWarnings("EqualsAndHashcode") + static class LimitJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode] + { + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + + if (obj.getClass() == this.getClass()) { + return true; + } + + return obj instanceof Long && (long) obj == Long.MAX_VALUE; + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 455cb7dd320..e81eded2f9c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -61,6 +61,7 @@ import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2; import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; import org.apache.druid.query.groupby.epinephelinae.GroupByRowProcessor; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.LimitSpec; import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.groupby.resource.GroupByQueryResource; @@ -221,7 +222,11 @@ public class GroupByStrategyV2 implements GroupByStrategy query.getPostAggregatorSpecs(), // Don't do "having" clause until the end of this method. null, - query.getLimitSpec(), + // Potentially pass limit down the stack (i.e. limit pushdown). Notes: + // (1) Limit pushdown is only supported for DefaultLimitSpec. + // (2) When pushing down a limit, it must be extended to include the offset (the offset will be applied + // higher-up). + query.isApplyLimitPushDown() ? ((DefaultLimitSpec) query.getLimitSpec()).withOffsetToLimit() : null, query.getSubtotalsSpec(), query.getContext() ).withOverriddenContext( @@ -386,7 +391,7 @@ public class GroupByStrategyV2 implements GroupByStrategy ); List queryDimNames = baseSubtotalQuery.getDimensions().stream().map(DimensionSpec::getOutputName) - .collect(Collectors.toList()); + .collect(Collectors.toList()); // Only needed to make LimitSpec.filterColumns(..) call later in case base query has a non default LimitSpec. Set aggsAndPostAggs = null; diff --git a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java index d67e234d494..33e16db5243 100644 --- a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java @@ -606,9 +606,16 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest List expectedResults = Arrays.asList( GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t3t3", "count", 4L), GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t5t5", "count", 4L), - GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t4t4", "count", 2L), - GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t2t2", "count", 2L), - GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t7t7", "count", 2L) + GroupByQueryRunnerTestHelper.createExpectedRow( + query, + "1970", + "texpr", + NullHandling.emptyToNullIfNeeded(""), + "count", + 2L + ), + GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t1t1", "count", 2L), + GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t2t2", "count", 2L) ); TestHelper.assertExpectedObjects(expectedResults, result.toList(), "expr-multi-multi-auto-auto-self"); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 3ab559a5510..393c558d879 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -3043,14 +3043,16 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest } @Test - public void testMergeResultsWithLimit() + public void testMergeResultsWithLimitAndOffset() { for (int limit = 1; limit < 20; ++limit) { - doTestMergeResultsWithValidLimit(limit); + for (int offset = 0; offset < 21; ++offset) { + doTestMergeResultsWithValidLimit(limit, offset); + } } } - private void doTestMergeResultsWithValidLimit(final int limit) + private void doTestMergeResultsWithValidLimit(final int limit, final int offset) { GroupByQuery.Builder builder = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) @@ -3058,7 +3060,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .setDimensions(new DefaultDimensionSpec("quality", "alias")) .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index")) .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) - .setLimit(limit); + .setLimitSpec(DefaultLimitSpec.builder().limit(limit).offset(offset).build()); final GroupByQuery fullQuery = builder.build(); @@ -3158,7 +3160,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest QueryRunner mergeRunner = factory.getToolchest().mergeResults(runner); TestHelper.assertExpectedObjects( - Iterables.limit(expectedResults, limit), + Iterables.limit(Iterables.skip(expectedResults, offset), limit), mergeRunner.run(QueryPlus.wrap(fullQuery)), StringUtils.format("limit: %d", limit) ); @@ -3700,7 +3702,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest query, "1970-01-01T00:00:00.000Z", "market", - "upfront", + "total_market", QueryRunnerTestHelper.UNIQUE_METRIC, QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC, @@ -3710,7 +3712,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest query, "1970-01-01T00:00:00.000Z", "market", - "total_market", + "upfront", QueryRunnerTestHelper.UNIQUE_METRIC, QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC, @@ -3867,7 +3869,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest query, "1970-01-01T00:00:00.000Z", "market", - "upfront", + "total_market", QueryRunnerTestHelper.UNIQUE_METRIC, QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC, @@ -3877,7 +3879,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest query, "1970-01-01T00:00:00.000Z", "market", - "total_market", + "upfront", QueryRunnerTestHelper.UNIQUE_METRIC, QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC, @@ -7198,10 +7200,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest ImmutableList.of("market"), ImmutableList.of() )) - .addOrderByColumn("idx") - .addOrderByColumn("alias") - .addOrderByColumn("market") - .setLimit(3) + .setLimitSpec(DefaultLimitSpec.builder().limit(3).orderBy("idx", "alias", "market").build()) .build(); List expectedResults = Arrays.asList( @@ -7214,6 +7213,44 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-order-limit"); } + @Test + public void testGroupByWithSubtotalsSpecWithOrderLimitAndOffset() + { + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } + + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new DefaultDimensionSpec("market", "market") + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias"), + ImmutableList.of("market"), + ImmutableList.of() + )) + .setLimitSpec(DefaultLimitSpec.builder().limit(2).offset(1).orderBy("idx", "alias", "market").build()) + .build(); + + List expectedResults = Arrays.asList( + makeRow(query, "2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), + makeRow(query, "2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-order-limit"); + } + @Test public void testGroupByWithTimeColumn() { @@ -9860,6 +9897,55 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); } + @Test + public void testGroupByLimitPushDownWithOffset() + { + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN).setDimensions(new DefaultDimensionSpec( + QueryRunnerTestHelper.MARKET_DIMENSION, + "marketalias" + )) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setLimitSpec( + new DefaultLimitSpec( + Collections.singletonList(new OrderByColumnSpec( + "marketalias", + OrderByColumnSpec.Direction.DESCENDING + )), + 1, + 2 + ) + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true)) + .build(); + + List expectedResults = Arrays.asList( + makeRow( + query, + "1970-01-01T00:00:00.000Z", + "marketalias", + "total_market", + "rows", + 186L + ), + makeRow( + query, + "1970-01-01T00:00:00.000Z", + "marketalias", + "spot", + "rows", + 837L + ) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); + } + @Test public void testGroupByLimitPushDownWithLongDimensionNotInLimitSpec() { @@ -10076,6 +10162,72 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged"); } + @Test + public void testMergeResultsWithLimitPushDownSortByAggWithOffset() + { + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } + GroupByQuery.Builder builder = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setInterval("2011-04-02/2011-04-04") + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index")) + .setLimitSpec( + new DefaultLimitSpec( + Collections.singletonList(new OrderByColumnSpec("idx", OrderByColumnSpec.Direction.DESCENDING)), + 2, + 3 + ) + ) + .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true)) + .setGranularity(Granularities.ALL); + + final GroupByQuery allGranQuery = builder.build(); + + QueryRunner mergedRunner = factory.getToolchest().mergeResults( + new QueryRunner() + { + @Override + public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) + { + // simulate two daily segments + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) + ); + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) + ); + + return factory.getToolchest().mergeResults( + (queryPlus3, responseContext1) -> new MergeSequence<>( + queryPlus3.getQuery().getResultOrdering(), + Sequences.simple( + Arrays.asList( + runner.run(queryPlus1, responseContext1), + runner.run(queryPlus2, responseContext1) + ) + ) + ) + ).run(queryPlus, responseContext); + } + } + ); + + List allGranExpectedResults = Arrays.asList( + makeRow(allGranQuery, "2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L), + makeRow(allGranQuery, "2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L), + makeRow(allGranQuery, "2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L) + ); + + Iterable results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList(); + TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged"); + } + @Test public void testMergeResultsWithLimitPushDownSortByDimDim() { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java b/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java index 8122a0e200c..51daf2661cb 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java @@ -36,7 +36,9 @@ import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ValueType; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.List; @@ -45,6 +47,9 @@ import java.util.List; */ public class DefaultLimitSpecTest { + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + private final List testRowsList; private final List testRowsWithTimestampList; @@ -271,4 +276,36 @@ public class DefaultLimitSpecTest limitFn.apply(Sequences.simple(testRowsList)).toList() ); } + + @Test + public void testWithOffsetToLimit() + { + final DefaultLimitSpec limitSpec = DefaultLimitSpec.builder().orderBy("abc").limit(1).offset(2).build(); + Assert.assertEquals( + DefaultLimitSpec.builder().orderBy("abc").limit(3).build(), + limitSpec.withOffsetToLimit() + ); + } + + @Test + public void testWithOffsetToLimitUnlimited() + { + final DefaultLimitSpec limitSpec = DefaultLimitSpec.builder().orderBy("abc").offset(2).build(); + Assert.assertEquals( + DefaultLimitSpec.builder().orderBy("abc").build(), + limitSpec.withOffsetToLimit() + ); + } + + @Test + public void testWithOffsetToLimitTooCloseToMaxValue() + { + final DefaultLimitSpec limitSpec = + DefaultLimitSpec.builder().orderBy("abc").limit(Integer.MAX_VALUE - 1).offset(2).build(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Cannot apply limit[2147483646] with offset[2] due to overflow"); + + limitSpec.withOffsetToLimit(); + } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/orderby/TopNSequenceTest.java b/processing/src/test/java/org/apache/druid/query/groupby/orderby/TopNSequenceTest.java deleted file mode 100644 index 24238af0ab8..00000000000 --- a/processing/src/test/java/org/apache/druid/query/groupby/orderby/TopNSequenceTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.groupby.orderby; - -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; -import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Sequences; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Random; - - -@RunWith(Parameterized.class) -public class TopNSequenceTest -{ - private static final Ordering ASC = Ordering.natural(); - private static final Ordering DESC = Ordering.natural().reverse(); - - private static final List EMPTY = Collections.emptyList(); - private static final List SINGLE = Collections.singletonList("a"); - private static final List RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk")); - private static final List RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba")); - - private Ordering ordering; - private List rawInput; - private int limit; - - @Parameterized.Parameters - public static Collection makeTestData() - { - Object[][] data = new Object[][]{ - {ASC, RAW_ASC, RAW_ASC.size() - 2}, - {ASC, RAW_ASC, RAW_ASC.size()}, - {ASC, RAW_ASC, RAW_ASC.size() + 2}, - {ASC, RAW_ASC, 0}, - {ASC, SINGLE, 0}, - {ASC, SINGLE, 1}, - {ASC, SINGLE, 2}, - {ASC, SINGLE, 3}, - {ASC, EMPTY, 0}, - {ASC, EMPTY, 1}, - {DESC, RAW_DESC, RAW_DESC.size() - 2}, - {DESC, RAW_DESC, RAW_DESC.size()}, - {DESC, RAW_DESC, RAW_DESC.size() + 2}, - {DESC, RAW_DESC, 0}, - {DESC, RAW_DESC, 0}, - {DESC, SINGLE, 1}, - {DESC, SINGLE, 2}, - {DESC, SINGLE, 3}, - {DESC, EMPTY, 0}, - {DESC, EMPTY, 1} - }; - - return Arrays.asList(data); - } - - public TopNSequenceTest(Ordering ordering, List rawInput, int limit) - { - this.ordering = ordering; - this.rawInput = rawInput; - this.limit = limit; - } - - @Test - public void testOrderByWithLimit() - { - List expected = rawInput.subList(0, Math.min(limit, rawInput.size())); - List inputs = Lists.newArrayList(rawInput); - Collections.shuffle(inputs, new Random(2)); - - Sequence result = new TopNSequence(Sequences.simple(inputs), ordering, limit); - - Assert.assertEquals(expected, result.toList()); - } -} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 137f9b70bd3..9c1f40c94f1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -928,6 +928,7 @@ public class DruidQuery sorting != null ? new DefaultLimitSpec( sorting.getOrderBys(), + 0, sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null ) : NoopLimitSpec.instance(), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 74901df9caf..86585d2ebdd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1566,7 +1566,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setContext(OUTER_LIMIT_CONTEXT) .build() ), - ImmutableList.of(new Object[]{"", "a", 1L}, new Object[]{"def", "abc", 1L}) + ImmutableList.of( + new Object[]{"", "a", 1L}, + new Object[]{"1", "a", 1L} + ) ); }