diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index 12c38f0d48e..43bd49f5a46 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -68,7 +68,7 @@ public class ScanQueryEngine if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) { long count = (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); - if (count >= query.getLimit()) { + if (count >= query.getLimit() && query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) { return Sequences.empty(); } } @@ -123,7 +123,9 @@ public class ScanQueryEngine if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) { responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L); } - final long limit = query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); + final long limit = query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE) ? + query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) : + Long.MAX_VALUE; return Sequences.concat( adapter .makeCursors( diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index bef118c4784..15785f5fb59 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -76,69 +76,24 @@ public class ScanQueryQueryToolChest extends QueryToolChest queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); - if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) { - if (scanQuery.getLimit() == Long.MAX_VALUE) { - return runner.run(queryPlusWithNonNullLegacy, responseContext); - } else { - return new BaseSequence<>( - new BaseSequence.IteratorMaker() - { - @Override - public ScanQueryLimitRowIterator make() - { - return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); - } - - @Override - public void cleanup(ScanQueryLimitRowIterator iterFromMake) - { - CloseQuietly.close(iterFromMake); - } - }); - } - } else if (scanQuery.getLimit() <= scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) { - ScanQueryNoLimitRowIterator scanResultIterator = - new BaseSequence.IteratorMaker() + if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.NONE) && scanQuery.getLimit() != Long.MAX_VALUE) { + return new BaseSequence<>( + new BaseSequence.IteratorMaker() { @Override - public ScanQueryNoLimitRowIterator make() + public ScanQueryLimitRowIterator make() { - return new ScanQueryNoLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); + return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); } @Override - public void cleanup(ScanQueryNoLimitRowIterator iterFromMake) - { - CloseQuietly.close(iterFromMake); - } - }.make(); - - return new BaseSequence( - new BaseSequence.IteratorMaker() - { - @Override - public ScanBatchedIterator make() - { - return new ScanBatchedIterator( - sortAndLimitScanResultValues(scanResultIterator, scanQuery), - scanQuery.getBatchSize() - ); - } - - @Override - public void cleanup(ScanBatchedIterator iterFromMake) + public void cleanup(ScanQueryLimitRowIterator iterFromMake) { CloseQuietly.close(iterFromMake); } }); - } else { - throw new UOE( - "Time ordering for result set limit of %,d is not supported. Try lowering the " - + "result set size to less than or equal to the time ordering limit of %,d.", - scanQuery.getLimit(), - scanQueryConfig.getMaxRowsQueuedForTimeOrdering() - ); } + return runner.run(queryPlusWithNonNullLegacy, responseContext); }; } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 45fad93d909..069a49c13a1 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -19,13 +19,22 @@ package org.apache.druid.query.scan; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.java.util.common.guava.BaseSequence; +import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.YieldingAccumulator; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; @@ -34,8 +43,15 @@ import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.segment.Segment; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Deque; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; public class ScanQueryRunnerFactory implements QueryRunnerFactory @@ -85,11 +101,38 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory input.run(queryPlus, responseContext) + ) + ); } else if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) { // Use priority queue strategy + Sequence queryResults = Sequences.concat(Sequences.map( + Sequences.simple(queryRunners), + input -> input.run(queryPlus, responseContext) + )); + return sortBatchAndLimitScanResultValues(queryResults, query); } else if (numSegments <= scanQueryConfig.getMaxSegmentsTimeOrderedInMemory()) { // Use flatMerge strategy + return Sequences.map( + Sequences.simple(queryRunners), + (input) -> Sequences.concat( + Sequences.map( + input.run(queryPlus, responseContext), + srv -> Sequences.simple(srv.toSingleEventScanResultValues()) + ) + ) + ).flatMerge( + seq -> seq, + Ordering.from(new ScanResultValueTimestampComparator( + query + )) + ).limit( + Math.toIntExact(query.getLimit()) + ); } else { throw new UOE( "Time ordering for result set limit of %,d is not supported. Try lowering the " @@ -98,17 +141,78 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory input.run(queryPlus, responseContext) - ) - ); - }; } + @VisibleForTesting + Sequence sortBatchAndLimitScanResultValues( + Sequence inputSequence, + ScanQuery scanQuery + ) + { + Comparator priorityQComparator = + new ScanResultValueTimestampComparator(scanQuery); + + // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch + // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) + int limit = Math.toIntExact(scanQuery.getLimit()); + PriorityQueue q = new PriorityQueue<>(limit, priorityQComparator); + + Yielder yielder = inputSequence.toYielder( + null, + new YieldingAccumulator() + { + @Override + public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in) + { + yield(); + return in; + } + } + ); + while (!yielder.isDone()) { + ScanResultValue next = yielder.get(); + List singleEventScanResultValues = next.toSingleEventScanResultValues(); + for (ScanResultValue srv : singleEventScanResultValues) { + // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list + // needs to be preserved for queries using the compactedList result format + q.offer(srv); + if (q.size() > limit) { + q.poll(); + } + } + yielder = yielder.next(null); + } + // Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order + // will be maintained + final Deque sortedElements = new ArrayDeque<>(q.size()); + while (q.size() != 0) { + // We add at the front of the list because poll removes the tail of the queue. + sortedElements.addFirst(q.poll()); + } + + // We can use an iterator here because all the results have been materialized for sorting + + return new BaseSequence( + new BaseSequence.IteratorMaker() + { + @Override + public ScanBatchedIterator make() + { + return new ScanBatchedIterator( + sortedElements.iterator(), + scanQuery.getBatchSize() + ); + } + + @Override + public void cleanup(ScanBatchedIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + }); + } + @Override public QueryToolChest getToolchest() { @@ -142,4 +246,46 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory + { + private final Iterator itr; + private final int batchSize; + + public ScanBatchedIterator(Iterator iterator, int batchSize) + { + this.itr = iterator; + this.batchSize = batchSize; + } + + @Override + public void close() throws IOException + { + } + + @Override + public boolean hasNext() + { + return itr.hasNext(); + } + + @Override + public ScanResultValue next() + { + // Create new ScanResultValue from event map + List eventsToAdd = new ArrayList<>(batchSize); + List columns = new ArrayList<>(); + while (eventsToAdd.size() < batchSize && itr.hasNext()) { + ScanResultValue srv = itr.next(); + // Only replace once using the columns from the first event + columns = columns.isEmpty() ? srv.getColumns() : columns; + eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents())); + } + return new ScanResultValue(null, columns, eventsToAdd); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java index b8a790cc41c..b32042b5b8b 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java @@ -25,6 +25,8 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.segment.column.ColumnHolder; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -85,6 +87,17 @@ public class ScanResultValue implements Comparable throw new UOE("Unable to get first event timestamp using result format of [%s]", query.getResultFormat()); } + public List toSingleEventScanResultValues() + { + List singleEventScanResultValues = new ArrayList<>(); + List events = (List) this.getEvents(); + for (Object event : events) { + singleEventScanResultValues.add(new ScanResultValue(segmentId, columns, Collections.singletonList(event))); + } + return singleEventScanResultValues; + } + + @Override public boolean equals(Object o) { diff --git a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java index a85b41d6116..3c7d64053a3 100644 --- a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java +++ b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java @@ -94,7 +94,8 @@ public class DoubleStorageTest private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new ScanQueryRunnerFactory( scanQueryQueryToolChest, - new ScanQueryEngine() + new ScanQueryEngine(), + new ScanQueryConfig() ); private Druids.ScanQueryBuilder newTestQuery() diff --git a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java index e138d73a1fe..7b051b0fee5 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java @@ -73,7 +73,8 @@ public class MultiSegmentScanQueryTest private static final QueryRunnerFactory factory = new ScanQueryRunnerFactory( toolChest, - new ScanQueryEngine() + new ScanQueryEngine(), + new ScanQueryConfig() ); // time modified version of druid.sample.numeric.tsv diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index ae3c2e84227..f3427dfac63 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -122,7 +122,8 @@ public class ScanQueryRunnerTest QueryRunnerTestHelper.makeQueryRunners( new ScanQueryRunnerFactory( toolChest, - new ScanQueryEngine() + new ScanQueryEngine(), + new ScanQueryConfig() ) ), ImmutableList.of(false, true) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 9ca3233f20d..d5ca8632a39 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -509,7 +509,8 @@ public class CalciteTests new ScanQueryConfig(), new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper()) ), - new ScanQueryEngine() + new ScanQueryEngine(), + new ScanQueryConfig() ) ) .put(