Add "offset" parameter to GroupBy query. (#10235)

* Add "offset" parameter to GroupBy query.

It works by doing the query as normal and then throwing away the first
"offset" number of rows on the broker.

* Stabilize GroupBy sorts.

* Fix inspections.

* Fix suppression.

* Fixups.

* Move TopNSequence to druid-core.

* Addl comments.

* NumberedElement equals verification.

* Changes from review.
This commit is contained in:
Gian Merlino 2020-08-05 15:39:58 -07:00 committed by GitHub
parent 9a29496b6c
commit b6aaf59e8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1066 additions and 250 deletions

View File

@ -28,6 +28,16 @@
Reference: https://github.com/apache/druid/pull/7894/files Reference: https://github.com/apache/druid/pull/7894/files
--> -->
<FindBugsFilter> <FindBugsFilter>
<!-- Ignore "equals" bugs for JsonInclude filter classes. They rely on strange-looking "equals" methods. -->
<Match>
<And>
<Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
<Or>
<Class name="org.apache.druid.query.groupby.orderby.DefaultLimitSpec$LimitJsonIncludeFilter"/>
</Or>
</And>
</Match>
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/> <Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
<Bug pattern="BC_UNCONFIRMED_CAST"/> <Bug pattern="BC_UNCONFIRMED_CAST"/>
<Bug pattern="BIT_SIGNED_CHECK_HIGH_BIT"/> <Bug pattern="BIT_SIGNED_CHECK_HIGH_BIT"/>

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.collections;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Objects;
/**
* Simultaneously sorts and limits its input.
*
* The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered.
*
* Not thread-safe.
*
* Note: this class doesn't have its own unit tests. It is tested along with
* {@link org.apache.druid.java.util.common.guava.TopNSequence} in "TopNSequenceTest".
*/
public class StableLimitingSorter<T>
{
private final MinMaxPriorityQueue<NumberedElement<T>> queue;
private long count = 0;
public StableLimitingSorter(final Comparator<T> comparator, final int limit)
{
this.queue = MinMaxPriorityQueue
.orderedBy(
Ordering.from(
Comparator.<NumberedElement<T>, T>comparing(NumberedElement::getElement, comparator)
.thenComparing(NumberedElement::getNumber)
)
)
.maximumSize(limit)
.create();
}
/**
* Offer an element to the sorter.
*/
public void add(T element)
{
queue.offer(new NumberedElement<>(element, count++));
}
/**
* Drain elements in sorted order (least first).
*/
public Iterator<T> drain()
{
return new Iterator<T>()
{
@Override
public boolean hasNext()
{
return !queue.isEmpty();
}
@Override
public T next()
{
return queue.poll().getElement();
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
@VisibleForTesting
static class NumberedElement<T>
{
private final T element;
private final long number;
public NumberedElement(T element, long number)
{
this.element = element;
this.number = number;
}
public T getElement()
{
return element;
}
public long getNumber()
{
return number;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NumberedElement<?> that = (NumberedElement<?>) o;
return number == that.number &&
Objects.equals(element, that.element);
}
@Override
public int hashCode()
{
return Objects.hash(element, number);
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.java.util.common.guava; package org.apache.druid.java.util.common.guava;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
@ -85,8 +86,21 @@ public interface Sequence<T>
return accumulate(new ArrayList<>(), Accumulators.list()); return accumulate(new ArrayList<>(), Accumulators.list());
} }
default Sequence<T> skip(long skip)
{
Preconditions.checkArgument(skip >= 0, "skip >= 0");
if (skip >= 1) {
return new SkippingSequence<>(this, skip);
} else {
return this;
}
}
default Sequence<T> limit(long limit) default Sequence<T> limit(long limit)
{ {
Preconditions.checkArgument(limit >= 0, "limit >= 0");
return new LimitedSequence<>(this, limit); return new LimitedSequence<>(this, limit);
} }

View File

@ -32,6 +32,7 @@ import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
*
*/ */
public class Sequences public class Sequences
{ {
@ -131,10 +132,18 @@ public class Sequences
}; };
} }
// This will materialize the entire sequence in memory. Use at your own risk. /**
* Returns a sorted copy of the provided sequence.
*
* This will materialize the entire sequence in memory. Use at your own risk.
*
* The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered.
*/
public static <T> Sequence<T> sort(final Sequence<T> sequence, final Comparator<T> comparator) public static <T> Sequence<T> sort(final Sequence<T> sequence, final Comparator<T> comparator)
{ {
List<T> seqList = sequence.toList(); List<T> seqList = sequence.toList();
// Note: Collections.sort is guaranteed to be stable.
Collections.sort(seqList, comparator); Collections.sort(seqList, comparator);
return simple(seqList); return simple(seqList);
} }

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.guava;
import org.apache.druid.java.util.common.IAE;
import java.io.IOException;
/**
* A Sequence that skips the first few elements.
*/
public class SkippingSequence<T> extends YieldingSequenceBase<T>
{
private final Sequence<T> baseSequence;
private final long skip;
public SkippingSequence(Sequence<T> baseSequence, long skip)
{
this.baseSequence = baseSequence;
this.skip = skip;
if (skip < 1) {
throw new IAE("'skip' must be greater than zero");
}
}
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{
final SkippingYieldingAccumulator<OutType> skippingAccumulator = new SkippingYieldingAccumulator<>(accumulator);
return wrapYielder(baseSequence.toYielder(initValue, skippingAccumulator), skippingAccumulator);
}
private <OutType> Yielder<OutType> wrapYielder(
final Yielder<OutType> yielder,
final SkippingYieldingAccumulator<OutType> accumulator
)
{
return new Yielder<OutType>()
{
@Override
public OutType get()
{
return yielder.get();
}
@Override
public Yielder<OutType> next(OutType initValue)
{
return wrapYielder(yielder.next(initValue), accumulator);
}
@Override
public boolean isDone()
{
return yielder.isDone();
}
@Override
public void close() throws IOException
{
yielder.close();
}
};
}
private class SkippingYieldingAccumulator<OutType> extends DelegatingYieldingAccumulator<OutType, T>
{
private long skipped = 0;
public SkippingYieldingAccumulator(final YieldingAccumulator<OutType, T> accumulator)
{
super(accumulator);
}
@Override
public OutType accumulate(OutType accumulated, T in)
{
if (skipped < skip) {
skipped++;
return accumulated;
} else {
return super.accumulate(accumulated, in);
}
}
}
}

View File

@ -17,22 +17,24 @@
* under the License. * under the License.
*/ */
package org.apache.druid.query.groupby.orderby; package org.apache.druid.java.util.common.guava;
import com.google.common.collect.MinMaxPriorityQueue; import org.apache.druid.collections.StableLimitingSorter;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
/**
* Simultaneously sorts and limits its input.
*
* The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered.
*/
public class TopNSequence<T> extends BaseSequence<T, Iterator<T>> public class TopNSequence<T> extends BaseSequence<T, Iterator<T>>
{ {
public TopNSequence( public TopNSequence(
final Sequence<T> input, final Sequence<T> input,
final Ordering<T> ordering, final Comparator<T> ordering,
final int limit final int limit
) )
{ {
@ -47,45 +49,18 @@ public class TopNSequence<T> extends BaseSequence<T, Iterator<T>>
} }
// Materialize the topN values // Materialize the topN values
final MinMaxPriorityQueue<T> queue = MinMaxPriorityQueue final StableLimitingSorter<T> sorter = new StableLimitingSorter<>(ordering, limit);
.orderedBy(ordering)
.maximumSize(limit)
.create();
input.accumulate( input.accumulate(
queue, sorter,
new Accumulator<MinMaxPriorityQueue<T>, T>() (theSorter, element) -> {
{ theSorter.add(element);
@Override return theSorter;
public MinMaxPriorityQueue<T> accumulate(MinMaxPriorityQueue<T> theQueue, T row)
{
theQueue.offer(row);
return theQueue;
}
} }
); );
// Now return them when asked // Now return them when asked
return new Iterator<T>() return sorter.drain();
{
@Override
public boolean hasNext()
{
return !queue.isEmpty();
}
@Override
public T next()
{
return queue.poll();
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
} }
@Override @Override

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.collections;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
public class StableLimitingSorterTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(StableLimitingSorter.NumberedElement.class).usingGetClass().verify();
}
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.guava;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class SkippingSequenceTest
{
private static final List<Integer> NUMS = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
@Test
public void testSanityAccumulate() throws Exception
{
final int threshold = 5;
SequenceTestHelper.testAll(
Sequences.simple(NUMS).skip(threshold),
Lists.newArrayList(Iterables.skip(NUMS, threshold))
);
}
@Test
public void testTwo() throws Exception
{
final int threshold = 2;
SequenceTestHelper.testAll(
Sequences.simple(NUMS).skip(threshold),
Lists.newArrayList(Iterables.skip(NUMS, threshold))
);
}
@Test
public void testOne() throws Exception
{
final int threshold = 1;
SequenceTestHelper.testAll(
Sequences.simple(NUMS).skip(threshold),
Lists.newArrayList(Iterables.skip(NUMS, threshold))
);
}
@Test
public void testLimitThenSkip() throws Exception
{
final int skip = 2;
final int limit = 4;
SequenceTestHelper.testAll(
Sequences.simple(NUMS).limit(limit).skip(skip),
Lists.newArrayList(Iterables.skip(Iterables.limit(NUMS, limit), skip))
);
}
@Test
public void testWithYieldingSequence()
{
// Create a Sequence whose Yielders will yield for each element, regardless of what the accumulator passed
// to "toYielder" does.
final BaseSequence<Integer, Iterator<Integer>> sequence = new BaseSequence<Integer, Iterator<Integer>>(
new BaseSequence.IteratorMaker<Integer, Iterator<Integer>>()
{
@Override
public Iterator<Integer> make()
{
return NUMS.iterator();
}
@Override
public void cleanup(Iterator<Integer> iterFromMake)
{
// Do nothing.
}
}
)
{
@Override
public <OutType> Yielder<OutType> toYielder(
final OutType initValue,
final YieldingAccumulator<OutType, Integer> accumulator
)
{
return super.toYielder(
initValue,
new DelegatingYieldingAccumulator<OutType, Integer>(accumulator)
{
@Override
public OutType accumulate(OutType accumulated, Integer in)
{
final OutType retVal = super.accumulate(accumulated, in);
yield();
return retVal;
}
}
);
}
};
final int threshold = 4;
// Can't use "testAll" because its "testYield" implementation depends on the underlying Sequence _not_ yielding.
SequenceTestHelper.testAccumulation(
"",
sequence.skip(threshold),
Lists.newArrayList(Iterables.skip(NUMS, threshold))
);
}
@Test
public void testNoSideEffects()
{
final List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
final AtomicLong accumulated = new AtomicLong(0);
final Sequence<Integer> seq = Sequences.simple(
Iterables.transform(
nums,
input -> {
accumulated.addAndGet(input);
return input;
}
)
).limit(5);
Assert.assertEquals(10, seq.accumulate(0, new SkippingSequenceTest.IntAdditionAccumulator()).intValue());
Assert.assertEquals(10, accumulated.get());
Assert.assertEquals(10, seq.accumulate(0, new SkippingSequenceTest.IntAdditionAccumulator()).intValue());
Assert.assertEquals(20, accumulated.get());
}
private static class IntAdditionAccumulator implements Accumulator<Integer, Integer>
{
@Override
public Integer accumulate(Integer accumulated, Integer in)
{
return accumulated + in;
}
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.guava;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
@RunWith(Enclosed.class)
public class TopNSequenceTest
{
private static final List<String> EMPTY = Collections.emptyList();
private static final List<String> SINGLE = Collections.singletonList("a");
private static final List<String> RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk"));
private static final List<String> RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba"));
@RunWith(Parameterized.class)
public static class TopNSequenceAscDescTest
{
private static final Ordering<String> ASC = Ordering.natural();
private static final Ordering<String> DESC = Ordering.natural().reverse();
private Ordering<String> ordering;
private List<String> rawInput;
private int limit;
@Parameterized.Parameters(name = "comparator={0}, rawInput={1}, limit={2}")
public static Collection<Object[]> makeTestData()
{
Object[][] data = new Object[][]{
{ASC, RAW_ASC, RAW_ASC.size() - 2},
{ASC, RAW_ASC, RAW_ASC.size()},
{ASC, RAW_ASC, RAW_ASC.size() + 2},
{ASC, RAW_ASC, 0},
{ASC, SINGLE, 0},
{ASC, SINGLE, 1},
{ASC, SINGLE, 2},
{ASC, SINGLE, 3},
{ASC, EMPTY, 0},
{ASC, EMPTY, 1},
{DESC, RAW_DESC, RAW_DESC.size() - 2},
{DESC, RAW_DESC, RAW_DESC.size()},
{DESC, RAW_DESC, RAW_DESC.size() + 2},
{DESC, RAW_DESC, 0},
{DESC, RAW_DESC, 0},
{DESC, SINGLE, 1},
{DESC, SINGLE, 2},
{DESC, SINGLE, 3},
{DESC, EMPTY, 0},
{DESC, EMPTY, 1}
};
return Arrays.asList(data);
}
public TopNSequenceAscDescTest(Ordering<String> ordering, List<String> rawInput, int limit)
{
this.ordering = ordering;
this.rawInput = rawInput;
this.limit = limit;
}
@Test
public void testOrderByWithLimit()
{
List<String> expected = rawInput.subList(0, Math.min(limit, rawInput.size()));
List<String> inputs = Lists.newArrayList(rawInput);
Collections.shuffle(inputs, new Random(2));
Sequence<String> result = new TopNSequence<>(Sequences.simple(inputs), ordering, limit);
Assert.assertEquals(expected, result.toList());
}
}
/**
* This class has test cases using a comparator that sometimes returns zero for unequal things.
*/
@RunWith(Parameterized.class)
public static class TopNSequenceEvenOddTest
{
// 'a', 'c', 'e', ... all come before 'b', 'd', 'f', ...
private static final Ordering<String> EVENODD = Ordering.from(Comparator.comparing(s -> 1 - s.charAt(0) % 2));
private String expected;
private List<String> rawInput;
@Parameterized.Parameters(name = "rawInput={1}")
public static Collection<Object[]> makeTestData()
{
Object[][] data = new Object[][]{
{"acegikbdfhj", RAW_ASC},
{"kigecajhfdb", RAW_DESC}
};
return Arrays.asList(data);
}
public TopNSequenceEvenOddTest(String expected, List<String> rawInput)
{
this.expected = expected;
this.rawInput = rawInput;
}
@Test
public void testStability()
{
// Verify that the output of the sequence is stable relative to the input.
for (int limit = 0; limit < expected.length() + 1; limit++) {
final TopNSequence<String> sequence = new TopNSequence<>(Sequences.simple(rawInput), EVENODD, limit);
Assert.assertEquals(
"limit = " + limit,
expected.substring(0, Math.min(limit, expected.length())),
Joiner.on("").join(sequence.toList())
);
}
}
}
}

View File

@ -35,11 +35,24 @@ The default limit spec takes a limit and the list of columns to do an orderBy op
```json ```json
{ {
"type" : "default", "type" : "default",
"limit" : <integer_value>, "limit" : <optional integer>,
"columns" : [list of OrderByColumnSpec], "offset" : <optional integer>,
"columns" : [<optional list of OrderByColumnSpec>],
} }
``` ```
The "limit" parameter is the maximum number of rows to return.
The "offset" parameter tells Druid to skip this many rows when returning results. If both "limit" and "offset" are
provided, then "offset" will be applied first, followed by "limit". For example, a spec with limit 100 and offset 10
will return 100 rows starting from row number 10. Internally, the query is executed by extending the limit by the offset
and then discarding a number of rows equal to the offset. This means that raising the offset will increase resource
usage by an amount similar to increasing the limit.
Together, "limit" and "offset" can be used to implement pagination. However, note that if the underlying datasource is
modified in between page fetches in ways that affect overall query results, then the different pages will not
necessarily align with each other.
#### OrderByColumnSpec #### OrderByColumnSpec
OrderByColumnSpecs indicate how to do order by operations. Each order-by condition can be a `jsonString` or a map of the following form: OrderByColumnSpecs indicate how to do order by operations. Each order-by condition can be a `jsonString` or a map of the following form:

View File

@ -132,7 +132,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs, @JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs, @JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("having") @Nullable HavingSpec havingSpec, @JsonProperty("having") @Nullable HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec, @JsonProperty("limitSpec") @Nullable LimitSpec limitSpec,
@JsonProperty("subtotalsSpec") @Nullable List<List<String>> subtotalsSpec, @JsonProperty("subtotalsSpec") @Nullable List<List<String>> subtotalsSpec,
@JsonProperty("context") Map<String, Object> context @JsonProperty("context") Map<String, Object> context
) )
@ -183,7 +183,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
final @Nullable List<AggregatorFactory> aggregatorSpecs, final @Nullable List<AggregatorFactory> aggregatorSpecs,
final @Nullable List<PostAggregator> postAggregatorSpecs, final @Nullable List<PostAggregator> postAggregatorSpecs,
final @Nullable HavingSpec havingSpec, final @Nullable HavingSpec havingSpec,
final LimitSpec limitSpec, final @Nullable LimitSpec limitSpec,
final @Nullable List<List<String>> subtotalsSpec, final @Nullable List<List<String>> subtotalsSpec,
final @Nullable Function<Sequence<ResultRow>, Sequence<ResultRow>> postProcessingFn, final @Nullable Function<Sequence<ResultRow>, Sequence<ResultRow>> postProcessingFn,
final Map<String, Object> context final Map<String, Object> context
@ -483,10 +483,10 @@ public class GroupByQuery extends BaseQuery<ResultRow>
final boolean forceLimitPushDown = validateAndGetForceLimitPushDown(); final boolean forceLimitPushDown = validateAndGetForceLimitPushDown();
if (limitSpec instanceof DefaultLimitSpec) { if (limitSpec instanceof DefaultLimitSpec) {
DefaultLimitSpec defaultLimitSpec = (DefaultLimitSpec) limitSpec; DefaultLimitSpec limitSpecWithoutOffset = ((DefaultLimitSpec) limitSpec).withOffsetToLimit();
// If only applying an orderby without a limit, don't try to push down // If only applying an orderby without a limit, don't try to push down
if (!defaultLimitSpec.isLimited()) { if (!limitSpecWithoutOffset.isLimited()) {
return false; return false;
} }
@ -1153,7 +1153,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) { if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) {
theLimitSpec = NoopLimitSpec.instance(); theLimitSpec = NoopLimitSpec.instance();
} else { } else {
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit); theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, 0, limit);
} }
} else { } else {
theLimitSpec = limitSpec; theLimitSpec = limitSpec;

View File

@ -621,6 +621,9 @@ public class GroupByQueryEngineV2
} }
if (canDoLimitPushdown) { if (canDoLimitPushdown) {
// Sanity check; must not have "offset" at this point.
Preconditions.checkState(!limitSpec.isOffset(), "Cannot push down offsets");
LimitedBufferHashGrouper<ByteBuffer> limitGrouper = new LimitedBufferHashGrouper<>( LimitedBufferHashGrouper<ByteBuffer> limitGrouper = new LimitedBufferHashGrouper<>(
Suppliers.ofInstance(buffer), Suppliers.ofInstance(buffer),
keySerde, keySerde,

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockInputStream;
@ -98,6 +99,9 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
this.keyObjComparator = keySerdeFactory.objectComparator(false); this.keyObjComparator = keySerdeFactory.objectComparator(false);
this.defaultOrderKeyObjComparator = keySerdeFactory.objectComparator(true); this.defaultOrderKeyObjComparator = keySerdeFactory.objectComparator(true);
if (limitSpec != null) { if (limitSpec != null) {
// Sanity check; must not have "offset" at this point.
Preconditions.checkState(!limitSpec.isOffset(), "Cannot push down offsets");
LimitedBufferHashGrouper<KeyType> limitGrouper = new LimitedBufferHashGrouper<>( LimitedBufferHashGrouper<KeyType> limitGrouper = new LimitedBufferHashGrouper<>(
bufferSupplier, bufferSupplier,
keySerde, keySerde,

View File

@ -20,14 +20,15 @@
package org.apache.druid.query.groupby.orderby; package org.apache.druid.query.groupby.orderby;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Functions; import com.google.common.base.Functions;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Rows; import org.apache.druid.data.input.Rows;
@ -35,6 +36,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.TopNSequence;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.dimension.DimensionSpec;
@ -47,11 +49,14 @@ import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -63,8 +68,14 @@ public class DefaultLimitSpec implements LimitSpec
private static final byte CACHE_KEY = 0x1; private static final byte CACHE_KEY = 0x1;
private final List<OrderByColumnSpec> columns; private final List<OrderByColumnSpec> columns;
private final int offset;
private final int limit; private final int limit;
public static Builder builder()
{
return new Builder();
}
/** /**
* Check if a limitSpec has columns in the sorting order that are not part of the grouping fields represented * Check if a limitSpec has columns in the sorting order that are not part of the grouping fields represented
* by `dimensions`. * by `dimensions`.
@ -98,13 +109,28 @@ public class DefaultLimitSpec implements LimitSpec
@JsonCreator @JsonCreator
public DefaultLimitSpec( public DefaultLimitSpec(
@JsonProperty("columns") List<OrderByColumnSpec> columns, @JsonProperty("columns") List<OrderByColumnSpec> columns,
@JsonProperty("offset") Integer offset,
@JsonProperty("limit") Integer limit @JsonProperty("limit") Integer limit
) )
{ {
this.columns = (columns == null) ? ImmutableList.of() : columns; this.columns = (columns == null) ? ImmutableList.of() : columns;
this.offset = (offset == null) ? 0 : offset;
this.limit = (limit == null) ? Integer.MAX_VALUE : limit; this.limit = (limit == null) ? Integer.MAX_VALUE : limit;
Preconditions.checkArgument(this.limit > 0, "limit[%s] must be >0", limit); Preconditions.checkArgument(this.offset >= 0, "offset[%s] must be >= 0", this.offset);
Preconditions.checkArgument(this.limit > 0, "limit[%s] must be > 0", this.limit);
}
/**
* Constructor that does not accept "offset". Useful for tests that only want to provide "columns" and "limit".
*/
@VisibleForTesting
public DefaultLimitSpec(
final List<OrderByColumnSpec> columns,
final Integer limit
)
{
this(columns, 0, limit);
} }
@JsonProperty @JsonProperty
@ -113,12 +139,32 @@ public class DefaultLimitSpec implements LimitSpec
return columns; return columns;
} }
/**
* Offset for this query; behaves like SQL "OFFSET". Zero means no offset. Negative values are invalid.
*/
@JsonProperty @JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getOffset()
{
return offset;
}
/**
* Offset for this query; behaves like SQL "LIMIT". Will always be positive. {@link Integer#MAX_VALUE} is used in
* situations where the user wants an effectively unlimited resultset.
*/
@JsonProperty
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = LimitJsonIncludeFilter.class)
public int getLimit() public int getLimit()
{ {
return limit; return limit;
} }
public boolean isOffset()
{
return offset > 0;
}
public boolean isLimited() public boolean isLimited()
{ {
return limit < Integer.MAX_VALUE; return limit < Integer.MAX_VALUE;
@ -174,10 +220,9 @@ public class DefaultLimitSpec implements LimitSpec
sortingNeeded = !query.getGranularity().equals(Granularities.ALL) && query.getContextSortByDimsFirst(); sortingNeeded = !query.getGranularity().equals(Granularities.ALL) && query.getContextSortByDimsFirst();
} }
if (!sortingNeeded) { final Function<Sequence<ResultRow>, Sequence<ResultRow>> sortAndLimitFn;
return isLimited() ? new LimitingFn(limit) : Functions.identity();
}
if (sortingNeeded) {
// Materialize the Comparator first for fast-fail error checking. // Materialize the Comparator first for fast-fail error checking.
final Ordering<ResultRow> ordering = makeComparator( final Ordering<ResultRow> ordering = makeComparator(
query.getResultRowSignature(), query.getResultRowSignature(),
@ -188,10 +233,26 @@ public class DefaultLimitSpec implements LimitSpec
query.getContextSortByDimsFirst() query.getContextSortByDimsFirst()
); );
// Both branches use a stable sort; important so consistent results are returned from query to query if the
// underlying data isn't changing. (Useful for query reproducibility and offset-based pagination.)
if (isLimited()) { if (isLimited()) {
return new TopNFunction(ordering, limit); sortAndLimitFn = results -> new TopNSequence<>(results, ordering, limit + offset);
} else { } else {
return new SortingFn(ordering); sortAndLimitFn = results -> Sequences.sort(results, ordering).limit(limit + offset);
}
} else {
if (isLimited()) {
sortAndLimitFn = results -> results.limit(limit + offset);
} else {
sortAndLimitFn = Functions.identity();
}
}
// Finally, apply offset after sorting and limiting.
if (isOffset()) {
return results -> sortAndLimitFn.apply(results).skip(offset);
} else {
return sortAndLimitFn;
} }
} }
@ -217,10 +278,38 @@ public class DefaultLimitSpec implements LimitSpec
{ {
return new DefaultLimitSpec( return new DefaultLimitSpec(
columns.stream().filter(c -> names.contains(c.getDimension())).collect(Collectors.toList()), columns.stream().filter(c -> names.contains(c.getDimension())).collect(Collectors.toList()),
offset,
limit limit
); );
} }
/**
* Returns a new DefaultLimitSpec identical to this one except for one difference: an offset parameter, if any, will
* be removed and added to the limit. This is designed for passing down queries to lower levels of the stack. Only
* the highest level should apply the offset parameter, and any pushed-down limits must be increased to accommodate
* the offset.
*/
public DefaultLimitSpec withOffsetToLimit()
{
if (isOffset()) {
final int newLimit;
if (limit == Integer.MAX_VALUE) {
// Unlimited stays unlimited.
newLimit = Integer.MAX_VALUE;
} else if (limit > Integer.MAX_VALUE - offset) {
// Handle overflow as best we can.
throw new ISE("Cannot apply limit[%d] with offset[%d] due to overflow", limit, offset);
} else {
newLimit = limit + offset;
}
return new DefaultLimitSpec(columns, 0, newLimit);
} else {
return this;
}
}
private Ordering<ResultRow> makeComparator( private Ordering<ResultRow> makeComparator(
RowSignature rowSignature, RowSignature rowSignature,
boolean hasTimestamp, boolean hasTimestamp,
@ -331,60 +420,11 @@ public class DefaultLimitSpec implements LimitSpec
{ {
return "DefaultLimitSpec{" + return "DefaultLimitSpec{" +
"columns='" + columns + '\'' + "columns='" + columns + '\'' +
", offset=" + offset +
", limit=" + limit + ", limit=" + limit +
'}'; '}';
} }
private static class LimitingFn implements Function<Sequence<ResultRow>, Sequence<ResultRow>>
{
private final int limit;
public LimitingFn(int limit)
{
this.limit = limit;
}
@Override
public Sequence<ResultRow> apply(Sequence<ResultRow> input)
{
return input.limit(limit);
}
}
private static class SortingFn implements Function<Sequence<ResultRow>, Sequence<ResultRow>>
{
private final Ordering<ResultRow> ordering;
public SortingFn(Ordering<ResultRow> ordering)
{
this.ordering = ordering;
}
@Override
public Sequence<ResultRow> apply(@Nullable Sequence<ResultRow> input)
{
return Sequences.sort(input, ordering);
}
}
private static class TopNFunction implements Function<Sequence<ResultRow>, Sequence<ResultRow>>
{
private final Ordering<ResultRow> ordering;
private final int limit;
public TopNFunction(Ordering<ResultRow> ordering, int limit)
{
this.ordering = ordering;
this.limit = limit;
}
@Override
public Sequence<ResultRow> apply(final Sequence<ResultRow> input)
{
return new TopNSequence<>(input, ordering, limit);
}
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
@ -394,25 +434,16 @@ public class DefaultLimitSpec implements LimitSpec
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
DefaultLimitSpec that = (DefaultLimitSpec) o; DefaultLimitSpec that = (DefaultLimitSpec) o;
return offset == that.offset &&
if (limit != that.limit) { limit == that.limit &&
return false; Objects.equals(columns, that.columns);
}
if (columns != null ? !columns.equals(that.columns) : that.columns != null) {
return false;
}
return true;
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
int result = columns != null ? columns.hashCode() : 0; return Objects.hash(columns, offset, limit);
result = 31 * result + limit;
return result;
} }
@Override @Override
@ -427,12 +458,83 @@ public class DefaultLimitSpec implements LimitSpec
++index; ++index;
} }
ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 4) ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 2 * Integer.BYTES)
.put(CACHE_KEY); .put(CACHE_KEY);
for (byte[] columnByte : columnBytes) { for (byte[] columnByte : columnBytes) {
buffer.put(columnByte); buffer.put(columnByte);
} }
buffer.put(Ints.toByteArray(limit));
buffer.putInt(limit);
buffer.putInt(offset);
return buffer.array(); return buffer.array();
} }
public static class Builder
{
private List<OrderByColumnSpec> columns = Collections.emptyList();
private Integer offset = null;
private Integer limit = null;
private Builder()
{
}
public Builder orderBy(final String... columns)
{
return orderBy(
Arrays.stream(columns)
.map(s -> new OrderByColumnSpec(s, OrderByColumnSpec.Direction.ASCENDING))
.toArray(OrderByColumnSpec[]::new)
);
}
public Builder orderBy(final OrderByColumnSpec... columns)
{
this.columns = ImmutableList.copyOf(Arrays.asList(columns));
return this;
}
public Builder offset(final int offset)
{
this.offset = offset;
return this;
}
public Builder limit(final int limit)
{
this.limit = limit;
return this;
}
public DefaultLimitSpec build()
{
return new DefaultLimitSpec(columns, offset, limit);
}
}
/**
* {@link JsonInclude} filter for {@link #getLimit()}.
*
* This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs
* exclusions (see spotbugs-exclude.xml).
*/
@SuppressWarnings("EqualsAndHashcode")
static class LimitJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode]
{
@Override
public boolean equals(Object obj)
{
if (obj == null) {
return false;
}
if (obj.getClass() == this.getClass()) {
return true;
}
return obj instanceof Long && (long) obj == Long.MAX_VALUE;
}
}
} }

View File

@ -61,6 +61,7 @@ import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByRowProcessor; import org.apache.druid.query.groupby.epinephelinae.GroupByRowProcessor;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.LimitSpec; import org.apache.druid.query.groupby.orderby.LimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.groupby.resource.GroupByQueryResource; import org.apache.druid.query.groupby.resource.GroupByQueryResource;
@ -221,7 +222,11 @@ public class GroupByStrategyV2 implements GroupByStrategy
query.getPostAggregatorSpecs(), query.getPostAggregatorSpecs(),
// Don't do "having" clause until the end of this method. // Don't do "having" clause until the end of this method.
null, null,
query.getLimitSpec(), // Potentially pass limit down the stack (i.e. limit pushdown). Notes:
// (1) Limit pushdown is only supported for DefaultLimitSpec.
// (2) When pushing down a limit, it must be extended to include the offset (the offset will be applied
// higher-up).
query.isApplyLimitPushDown() ? ((DefaultLimitSpec) query.getLimitSpec()).withOffsetToLimit() : null,
query.getSubtotalsSpec(), query.getSubtotalsSpec(),
query.getContext() query.getContext()
).withOverriddenContext( ).withOverriddenContext(

View File

@ -606,9 +606,16 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
List<ResultRow> expectedResults = Arrays.asList( List<ResultRow> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t3t3", "count", 4L), GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t3t3", "count", 4L),
GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t5t5", "count", 4L), GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t5t5", "count", 4L),
GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t4t4", "count", 2L), GroupByQueryRunnerTestHelper.createExpectedRow(
GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t2t2", "count", 2L), query,
GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t7t7", "count", 2L) "1970",
"texpr",
NullHandling.emptyToNullIfNeeded(""),
"count",
2L
),
GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t1t1", "count", 2L),
GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t2t2", "count", 2L)
); );
TestHelper.assertExpectedObjects(expectedResults, result.toList(), "expr-multi-multi-auto-auto-self"); TestHelper.assertExpectedObjects(expectedResults, result.toList(), "expr-multi-multi-auto-auto-self");

View File

@ -3043,14 +3043,16 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
} }
@Test @Test
public void testMergeResultsWithLimit() public void testMergeResultsWithLimitAndOffset()
{ {
for (int limit = 1; limit < 20; ++limit) { for (int limit = 1; limit < 20; ++limit) {
doTestMergeResultsWithValidLimit(limit); for (int offset = 0; offset < 21; ++offset) {
doTestMergeResultsWithValidLimit(limit, offset);
}
} }
} }
private void doTestMergeResultsWithValidLimit(final int limit) private void doTestMergeResultsWithValidLimit(final int limit, final int offset)
{ {
GroupByQuery.Builder builder = makeQueryBuilder() GroupByQuery.Builder builder = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
@ -3058,7 +3060,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setDimensions(new DefaultDimensionSpec("quality", "alias")) .setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index")) .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) .setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setLimit(limit); .setLimitSpec(DefaultLimitSpec.builder().limit(limit).offset(offset).build());
final GroupByQuery fullQuery = builder.build(); final GroupByQuery fullQuery = builder.build();
@ -3158,7 +3160,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
QueryRunner<ResultRow> mergeRunner = factory.getToolchest().mergeResults(runner); QueryRunner<ResultRow> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects( TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, limit), Iterables.limit(Iterables.skip(expectedResults, offset), limit),
mergeRunner.run(QueryPlus.wrap(fullQuery)), mergeRunner.run(QueryPlus.wrap(fullQuery)),
StringUtils.format("limit: %d", limit) StringUtils.format("limit: %d", limit)
); );
@ -3700,7 +3702,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
query, query,
"1970-01-01T00:00:00.000Z", "1970-01-01T00:00:00.000Z",
"market", "market",
"upfront", "total_market",
QueryRunnerTestHelper.UNIQUE_METRIC, QueryRunnerTestHelper.UNIQUE_METRIC,
QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC, QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@ -3710,7 +3712,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
query, query,
"1970-01-01T00:00:00.000Z", "1970-01-01T00:00:00.000Z",
"market", "market",
"total_market", "upfront",
QueryRunnerTestHelper.UNIQUE_METRIC, QueryRunnerTestHelper.UNIQUE_METRIC,
QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC, QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@ -3867,7 +3869,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
query, query,
"1970-01-01T00:00:00.000Z", "1970-01-01T00:00:00.000Z",
"market", "market",
"upfront", "total_market",
QueryRunnerTestHelper.UNIQUE_METRIC, QueryRunnerTestHelper.UNIQUE_METRIC,
QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC, QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@ -3877,7 +3879,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
query, query,
"1970-01-01T00:00:00.000Z", "1970-01-01T00:00:00.000Z",
"market", "market",
"total_market", "upfront",
QueryRunnerTestHelper.UNIQUE_METRIC, QueryRunnerTestHelper.UNIQUE_METRIC,
QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC, QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@ -7198,10 +7200,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
ImmutableList.of("market"), ImmutableList.of("market"),
ImmutableList.of() ImmutableList.of()
)) ))
.addOrderByColumn("idx") .setLimitSpec(DefaultLimitSpec.builder().limit(3).orderBy("idx", "alias", "market").build())
.addOrderByColumn("alias")
.addOrderByColumn("market")
.setLimit(3)
.build(); .build();
List<ResultRow> expectedResults = Arrays.asList( List<ResultRow> expectedResults = Arrays.asList(
@ -7214,6 +7213,44 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-order-limit"); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-order-limit");
} }
@Test
public void testGroupByWithSubtotalsSpecWithOrderLimitAndOffset()
{
if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
return;
}
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(Lists.newArrayList(
new DefaultDimensionSpec("quality", "alias"),
new DefaultDimensionSpec("market", "market")
))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.setSubtotalsSpec(ImmutableList.of(
ImmutableList.of("alias"),
ImmutableList.of("market"),
ImmutableList.of()
))
.setLimitSpec(DefaultLimitSpec.builder().limit(2).offset(1).orderBy("idx", "alias", "market").build())
.build();
List<ResultRow> expectedResults = Arrays.asList(
makeRow(query, "2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
makeRow(query, "2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-order-limit");
}
@Test @Test
public void testGroupByWithTimeColumn() public void testGroupByWithTimeColumn()
{ {
@ -9860,6 +9897,55 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
} }
@Test
public void testGroupByLimitPushDownWithOffset()
{
if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
return;
}
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN).setDimensions(new DefaultDimensionSpec(
QueryRunnerTestHelper.MARKET_DIMENSION,
"marketalias"
))
.setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.setLimitSpec(
new DefaultLimitSpec(
Collections.singletonList(new OrderByColumnSpec(
"marketalias",
OrderByColumnSpec.Direction.DESCENDING
)),
1,
2
)
).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true))
.build();
List<ResultRow> expectedResults = Arrays.asList(
makeRow(
query,
"1970-01-01T00:00:00.000Z",
"marketalias",
"total_market",
"rows",
186L
),
makeRow(
query,
"1970-01-01T00:00:00.000Z",
"marketalias",
"spot",
"rows",
837L
)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
@Test @Test
public void testGroupByLimitPushDownWithLongDimensionNotInLimitSpec() public void testGroupByLimitPushDownWithLongDimensionNotInLimitSpec()
{ {
@ -10076,6 +10162,72 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged"); TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged");
} }
@Test
public void testMergeResultsWithLimitPushDownSortByAggWithOffset()
{
if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
return;
}
GroupByQuery.Builder builder = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
.setLimitSpec(
new DefaultLimitSpec(
Collections.singletonList(new OrderByColumnSpec("idx", OrderByColumnSpec.Direction.DESCENDING)),
2,
3
)
)
.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true))
.setGranularity(Granularities.ALL);
final GroupByQuery allGranQuery = builder.build();
QueryRunner mergedRunner = factory.getToolchest().mergeResults(
new QueryRunner<ResultRow>()
{
@Override
public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext responseContext)
{
// simulate two daily segments
final QueryPlus<ResultRow> queryPlus1 = queryPlus.withQuery(
queryPlus.getQuery().withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03")))
)
);
final QueryPlus<ResultRow> queryPlus2 = queryPlus.withQuery(
queryPlus.getQuery().withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04")))
)
);
return factory.getToolchest().mergeResults(
(queryPlus3, responseContext1) -> new MergeSequence<>(
queryPlus3.getQuery().getResultOrdering(),
Sequences.simple(
Arrays.asList(
runner.run(queryPlus1, responseContext1),
runner.run(queryPlus2, responseContext1)
)
)
)
).run(queryPlus, responseContext);
}
}
);
List<ResultRow> allGranExpectedResults = Arrays.asList(
makeRow(allGranQuery, "2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L),
makeRow(allGranQuery, "2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
makeRow(allGranQuery, "2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L)
);
Iterable<ResultRow> results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList();
TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged");
}
@Test @Test
public void testMergeResultsWithLimitPushDownSortByDimDim() public void testMergeResultsWithLimitPushDownSortByDimDim()
{ {

View File

@ -36,7 +36,9 @@ import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.List; import java.util.List;
@ -45,6 +47,9 @@ import java.util.List;
*/ */
public class DefaultLimitSpecTest public class DefaultLimitSpecTest
{ {
@Rule
public final ExpectedException expectedException = ExpectedException.none();
private final List<ResultRow> testRowsList; private final List<ResultRow> testRowsList;
private final List<ResultRow> testRowsWithTimestampList; private final List<ResultRow> testRowsWithTimestampList;
@ -271,4 +276,36 @@ public class DefaultLimitSpecTest
limitFn.apply(Sequences.simple(testRowsList)).toList() limitFn.apply(Sequences.simple(testRowsList)).toList()
); );
} }
@Test
public void testWithOffsetToLimit()
{
final DefaultLimitSpec limitSpec = DefaultLimitSpec.builder().orderBy("abc").limit(1).offset(2).build();
Assert.assertEquals(
DefaultLimitSpec.builder().orderBy("abc").limit(3).build(),
limitSpec.withOffsetToLimit()
);
}
@Test
public void testWithOffsetToLimitUnlimited()
{
final DefaultLimitSpec limitSpec = DefaultLimitSpec.builder().orderBy("abc").offset(2).build();
Assert.assertEquals(
DefaultLimitSpec.builder().orderBy("abc").build(),
limitSpec.withOffsetToLimit()
);
}
@Test
public void testWithOffsetToLimitTooCloseToMaxValue()
{
final DefaultLimitSpec limitSpec =
DefaultLimitSpec.builder().orderBy("abc").limit(Integer.MAX_VALUE - 1).offset(2).build();
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Cannot apply limit[2147483646] with offset[2] due to overflow");
limitSpec.withOffsetToLimit();
}
} }

View File

@ -1,101 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.orderby;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@RunWith(Parameterized.class)
public class TopNSequenceTest
{
private static final Ordering<String> ASC = Ordering.natural();
private static final Ordering<String> DESC = Ordering.natural().reverse();
private static final List<String> EMPTY = Collections.emptyList();
private static final List<String> SINGLE = Collections.singletonList("a");
private static final List<String> RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk"));
private static final List<String> RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba"));
private Ordering<String> ordering;
private List<String> rawInput;
private int limit;
@Parameterized.Parameters
public static Collection<Object[]> makeTestData()
{
Object[][] data = new Object[][]{
{ASC, RAW_ASC, RAW_ASC.size() - 2},
{ASC, RAW_ASC, RAW_ASC.size()},
{ASC, RAW_ASC, RAW_ASC.size() + 2},
{ASC, RAW_ASC, 0},
{ASC, SINGLE, 0},
{ASC, SINGLE, 1},
{ASC, SINGLE, 2},
{ASC, SINGLE, 3},
{ASC, EMPTY, 0},
{ASC, EMPTY, 1},
{DESC, RAW_DESC, RAW_DESC.size() - 2},
{DESC, RAW_DESC, RAW_DESC.size()},
{DESC, RAW_DESC, RAW_DESC.size() + 2},
{DESC, RAW_DESC, 0},
{DESC, RAW_DESC, 0},
{DESC, SINGLE, 1},
{DESC, SINGLE, 2},
{DESC, SINGLE, 3},
{DESC, EMPTY, 0},
{DESC, EMPTY, 1}
};
return Arrays.asList(data);
}
public TopNSequenceTest(Ordering<String> ordering, List<String> rawInput, int limit)
{
this.ordering = ordering;
this.rawInput = rawInput;
this.limit = limit;
}
@Test
public void testOrderByWithLimit()
{
List<String> expected = rawInput.subList(0, Math.min(limit, rawInput.size()));
List<String> inputs = Lists.newArrayList(rawInput);
Collections.shuffle(inputs, new Random(2));
Sequence<String> result = new TopNSequence<String>(Sequences.simple(inputs), ordering, limit);
Assert.assertEquals(expected, result.toList());
}
}

View File

@ -928,6 +928,7 @@ public class DruidQuery
sorting != null sorting != null
? new DefaultLimitSpec( ? new DefaultLimitSpec(
sorting.getOrderBys(), sorting.getOrderBys(),
0,
sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null
) )
: NoopLimitSpec.instance(), : NoopLimitSpec.instance(),

View File

@ -1566,7 +1566,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setContext(OUTER_LIMIT_CONTEXT) .setContext(OUTER_LIMIT_CONTEXT)
.build() .build()
), ),
ImmutableList.of(new Object[]{"", "a", 1L}, new Object[]{"def", "abc", 1L}) ImmutableList.of(
new Object[]{"", "a", 1L},
new Object[]{"1", "a", 1L}
)
); );
} }