diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 840906c31c5..45fad93d909 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.inject.Inject; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.Query; @@ -45,15 +46,18 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory() - { - @Override - public Sequence run( - final QueryPlus queryPlus, final Map responseContext - ) - { - int numSegments = 0; - final Iterator> segmentIt = queryRunners.iterator(); - for (; segmentIt.hasNext(); numSegments++) { - segmentIt.next(); - } - // Note: this variable is effective only when queryContext has a timeout. - // See the comment of CTX_TIMEOUT_AT. - final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery()); - responseContext.put(CTX_TIMEOUT_AT, timeoutAt); - return Sequences.concat( - Sequences.map( - Sequences.simple(queryRunners), - new Function, Sequence>() - { - @Override - public Sequence apply(final QueryRunner input) - { - return input.run(queryPlus, responseContext); - } - } - ) + return (queryPlus, responseContext) -> { + ScanQuery query = (ScanQuery) queryPlus.getQuery(); + int numSegments = 0; + final Iterator> segmentIt = queryRunners.iterator(); + for (; segmentIt.hasNext(); numSegments++) { + segmentIt.next(); + } + // Note: this variable is effective only when queryContext has a timeout. + // See the comment of CTX_TIMEOUT_AT. + final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery()); + responseContext.put(CTX_TIMEOUT_AT, timeoutAt); + if (query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) { + // Use existing strategy + } else if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) { + // Use priority queue strategy + } else if (numSegments <= scanQueryConfig.getMaxSegmentsTimeOrderedInMemory()) { + // Use flatMerge strategy + } else { + throw new UOE( + "Time ordering for result set limit of %,d is not supported. Try lowering the " + + "result set size to less than or equal to the time ordering limit of %,d.", + query.getLimit(), + scanQueryConfig.getMaxRowsQueuedForTimeOrdering() ); } + + return Sequences.concat( + Sequences.map( + Sequences.simple(queryRunners), + input -> input.run(queryPlus, responseContext) + ) + ); + }; }