diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java index 287fc62e289..4b7c590be37 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.PriorityQueue; /** + * Used to perform an n-way merge on n ordered sequences */ public class MergeSequence extends YieldingSequenceBase { @@ -43,20 +44,18 @@ public class MergeSequence extends YieldingSequenceBase this.baseSequences = (Sequence>) baseSequences; } + /* + Note: the yielder for MergeSequence returns elements from the priority queue in order of increasing priority. + This is due to the fact that PriorityQueue#remove() polls from the head of the queue which is, according to + the PriorityQueue javadoc, "the least element with respect to the specified ordering" + */ @Override public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) { PriorityQueue> pQueue = new PriorityQueue<>( 32, ordering.onResultOf( - new Function, T>() - { - @Override - public T apply(Yielder input) - { - return input.get(); - } - } + (Function, T>) input -> input.get() ) ); diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index d4b0519c494..35420c4d4a6 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -20,16 +20,11 @@ package org.apache.druid.query.scan; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.collect.Iterables; import com.google.inject.Inject; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.CloseQuietly; -import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.GenericQueryMetricsFactory; import org.apache.druid.query.Query; import org.apache.druid.query.QueryMetrics; @@ -38,17 +33,6 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.aggregation.MetricManipulationFn; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Deque; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; - public class ScanQueryQueryToolChest extends QueryToolChest { private static final TypeReference TYPE_REFERENCE = new TypeReference() diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 15ceb16ed7a..aa0a00d9d23 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -20,23 +20,24 @@ package org.apache.druid.query.scan; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.guava.YieldingAccumulator; import org.apache.druid.java.util.common.guava.YieldingSequenceBase; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; @@ -45,11 +46,10 @@ import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.segment.Segment; +import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Comparator; -import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -134,20 +134,27 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory seq, Ordering.from(new ScanResultValueTimestampComparator( query - )).reverse() // TODO Figure out why this needs to be reversed + )).reverse() // This needs to be reversed because ).limit( Math.toIntExact(query.getLimit()) ); // Batch the scan result values - } else { + return new ScanResultValueBatchingSequence(unbatched, query.getBatchSize()); + } else if (query.getLimit() > scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) { throw new UOE( - "Time ordering for result set limit of %,d is not supported. Try lowering the " - + "result set size to less than or equal to the time ordering limit of %,d.", + "Time ordering for query result set limit of %,d is not supported. Try lowering the result " + + "set size to less than or equal to the configurable time ordering limit of %,d rows.", query.getLimit(), scanQueryConfig.getMaxRowsQueuedForTimeOrdering() ); } + throw new UOE( + "Time ordering for queries of %,d segments per historical is not supported. Try reducing the scope " + + "of the query to scan fewer segments than the configurable time ordering limit of %,d segments", + numSegments, + scanQueryConfig.getMaxSegmentsTimeOrderedInMemory() + ); }; } @@ -157,13 +164,14 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory priorityQComparator = - new ScanResultValueTimestampComparator(scanQuery); + Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) int limit = Math.toIntExact(scanQuery.getLimit()); - PriorityQueue q = new PriorityQueue<>(limit, priorityQComparator); + + // Comparator ordering is reversed since polling from the queue returns elements in reversed order + PriorityQueue q = new PriorityQueue<>(limit, priorityQComparator.reversed()); Yielder yielder = inputSequence.toYielder( null, @@ -192,28 +200,25 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory sortedElements = new ArrayDeque<>(q.size()); + final List sortedElements = new ArrayList<>(q.size()); while (q.size() != 0) { - // We add at the front of the list because poll removes the tail of the queue. - sortedElements.addFirst(q.poll()); + sortedElements.add(q.poll()); } - // We can use an iterator here because all the results have been materialized for sorting - return new BaseSequence( - new BaseSequence.IteratorMaker() + new BaseSequence.IteratorMaker() { @Override - public ScanBatchedIterator make() + public ScanBatchedIteratorIterator make() { - return new ScanBatchedIterator( + return new ScanBatchedIteratorIterator( sortedElements.iterator(), scanQuery.getBatchSize() ); } @Override - public void cleanup(ScanBatchedIterator iterFromMake) + public void cleanup(ScanBatchedIteratorIterator iterFromMake) { CloseQuietly.close(iterFromMake); } @@ -260,12 +265,12 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory + static class ScanBatchedIteratorIterator implements CloseableIterator { private final Iterator itr; private final int batchSize; - public ScanBatchedIterator(Iterator iterator, int batchSize) + public ScanBatchedIteratorIterator(Iterator iterator, int batchSize) { this.itr = iterator; this.batchSize = batchSize; @@ -297,4 +302,83 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory + { + Yielder inputYielder; + int batchSize; + + public ScanResultValueBatchingSequence(Sequence inputSequence, int batchSize) { + this.inputYielder = inputSequence.toYielder( + null, + new YieldingAccumulator() + { + @Override + public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in) + { + yield(); + return in; + } + } + ); + this.batchSize = batchSize; + } + + @Override + @Nullable + public Yielder toYielder( + OutType initValue, YieldingAccumulator accumulator + ) + { + return makeYielder(initValue, accumulator); + } + + private Yielder makeYielder( + OutType initVal, + final YieldingAccumulator accumulator + ) + { + return new Yielder() + { + @Override + public OutType get() + { + // Create new ScanResultValue from event map + List eventsToAdd = new ArrayList<>(batchSize); + List columns = new ArrayList<>(); + while (eventsToAdd.size() < batchSize && !inputYielder.isDone()) { + ScanResultValue srv = inputYielder.get(); + // Only replace once using the columns from the first event + columns = columns.isEmpty() ? srv.getColumns() : columns; + eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents())); + inputYielder = inputYielder.next(null); + } + try { + return (OutType) new ScanResultValue(null, columns, eventsToAdd); + } catch (ClassCastException e) { + return initVal; + } + } + + @Override + public Yielder next(OutType initValue) + { + accumulator.reset(); + return makeYielder(initValue, accumulator); + } + + @Override + public boolean isDone() + { + return inputYielder.isDone(); + } + + @Override + public void close() + { + } + }; + } + } }