From 7e872a8ebcea0d3a141addd122dd9f8b6629ead6 Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Wed, 6 Feb 2019 15:36:24 -0800 Subject: [PATCH] Created an error message for when someone tries to time order a result set > threshold limit --- .../query/scan/ScanQueryQueryToolChest.java | 329 +++++++++--------- 1 file changed, 168 insertions(+), 161 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 77ac6206be2..7b8d341698c 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -48,186 +48,193 @@ import java.util.PriorityQueue; public class ScanQueryQueryToolChest extends QueryToolChest { - private static final TypeReference TYPE_REFERENCE = new TypeReference() - { - }; + private static final TypeReference TYPE_REFERENCE = new TypeReference() + { + }; - private final ScanQueryConfig scanQueryConfig; - private final GenericQueryMetricsFactory queryMetricsFactory; - private final long maxRowsForInMemoryTimeOrdering; + private final ScanQueryConfig scanQueryConfig; + private final GenericQueryMetricsFactory queryMetricsFactory; + private final long maxRowsForInMemoryTimeOrdering; - @Inject - public ScanQueryQueryToolChest( - final ScanQueryConfig scanQueryConfig, - final GenericQueryMetricsFactory queryMetricsFactory - ) - { - this.scanQueryConfig = scanQueryConfig; - this.queryMetricsFactory = queryMetricsFactory; - this.maxRowsForInMemoryTimeOrdering = scanQueryConfig.getMaxRowsTimeOrderedInMemory(); - } + @Inject + public ScanQueryQueryToolChest( + final ScanQueryConfig scanQueryConfig, + final GenericQueryMetricsFactory queryMetricsFactory + ) + { + this.scanQueryConfig = scanQueryConfig; + this.queryMetricsFactory = queryMetricsFactory; + this.maxRowsForInMemoryTimeOrdering = scanQueryConfig.getMaxRowsTimeOrderedInMemory(); + } - @Override - public QueryRunner mergeResults(final QueryRunner runner) - { - return (queryPlus, responseContext) -> { - // Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it - // the same way, even if they have different default legacy values. - final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig); - final QueryPlus queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); - final BaseSequence.IteratorMaker scanQueryLimitRowIteratorMaker = - new BaseSequence.IteratorMaker() - { - @Override - public ScanQueryLimitRowIterator make() - { - return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); - } + @Override + public QueryRunner mergeResults(final QueryRunner runner) + { + return (queryPlus, responseContext) -> { + // Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it + // the same way, even if they have different default legacy values. + final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig); + final QueryPlus queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); + final BaseSequence.IteratorMaker scanQueryLimitRowIteratorMaker = + new BaseSequence.IteratorMaker() + { + @Override + public ScanQueryLimitRowIterator make() + { + return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); + } - @Override - public void cleanup(ScanQueryLimitRowIterator iterFromMake) - { - CloseQuietly.close(iterFromMake); - } - }; + @Override + public void cleanup(ScanQueryLimitRowIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + }; - if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE) || - scanQuery.getLimit() > maxRowsForInMemoryTimeOrdering) { - if (scanQuery.getLimit() == Long.MAX_VALUE) { - return runner.run(queryPlusWithNonNullLegacy, responseContext); - } - return new BaseSequence(scanQueryLimitRowIteratorMaker); - } else if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) || - scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) { - Iterator scanResultIterator = scanQueryLimitRowIteratorMaker.make(); + if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE)) { + if (scanQuery.getLimit() == Long.MAX_VALUE) { + return runner.run(queryPlusWithNonNullLegacy, responseContext); + } + return new BaseSequence(scanQueryLimitRowIteratorMaker); + } else if ((scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) || + scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) + && scanQuery.getLimit() <= scanQueryConfig.getMaxRowsTimeOrderedInMemory()) { + Iterator scanResultIterator = scanQueryLimitRowIteratorMaker.make(); - return new BaseSequence( - new BaseSequence.IteratorMaker() - { - @Override - public ScanBatchedTimeOrderedQueueIterator make() - { - return new ScanBatchedTimeOrderedQueueIterator( - heapsortScanResultValues(scanResultIterator, scanQuery), - scanQuery.getBatchSize() - ); - } + return new BaseSequence( + new BaseSequence.IteratorMaker() + { + @Override + public ScanBatchedTimeOrderedQueueIterator make() + { + return new ScanBatchedTimeOrderedQueueIterator( + heapsortScanResultValues(scanResultIterator, scanQuery), + scanQuery.getBatchSize() + ); + } - @Override - public void cleanup(ScanBatchedTimeOrderedQueueIterator iterFromMake) - { - CloseQuietly.close(iterFromMake); - } - }); - } else { - throw new UOE("Time ordering [%s] is not supported", scanQuery.getTimeOrder()); - } - }; - } + @Override + public void cleanup(ScanBatchedTimeOrderedQueueIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + }); + } else if (scanQuery.getLimit() > scanQueryConfig.getMaxRowsTimeOrderedInMemory()) { + throw new UOE( + "Time ordering for result set limit of %s is not supported. Try lowering the " + + "result set size to less than or equal to the time ordering limit of %s.", + scanQuery.getLimit(), + scanQueryConfig.getMaxRowsTimeOrderedInMemory() + ); + } else { + throw new UOE("Time ordering [%s] is not supported", scanQuery.getTimeOrder()); + } + }; + } - @Override - public QueryMetrics> makeMetrics(ScanQuery query) - { - return queryMetricsFactory.makeMetrics(query); - } + @Override + public QueryMetrics> makeMetrics(ScanQuery query) + { + return queryMetricsFactory.makeMetrics(query); + } - @Override - public Function makePreComputeManipulatorFn( - ScanQuery query, - MetricManipulationFn fn - ) - { - return Functions.identity(); - } + @Override + public Function makePreComputeManipulatorFn( + ScanQuery query, + MetricManipulationFn fn + ) + { + return Functions.identity(); + } - @Override - public TypeReference getResultTypeReference() - { - return TYPE_REFERENCE; - } + @Override + public TypeReference getResultTypeReference() + { + return TYPE_REFERENCE; + } - @Override - public QueryRunner preMergeQueryDecoration(final QueryRunner runner) - { - return new QueryRunner() - { - @Override - public Sequence run(QueryPlus queryPlus, Map responseContext) - { - ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery(); - if (scanQuery.getFilter() != null) { - scanQuery = scanQuery.withDimFilter(scanQuery.getFilter().optimize()); - queryPlus = queryPlus.withQuery(scanQuery); - } - return runner.run(queryPlus, responseContext); - } - }; - } + @Override + public QueryRunner preMergeQueryDecoration(final QueryRunner runner) + { + return new QueryRunner() + { + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery(); + if (scanQuery.getFilter() != null) { + scanQuery = scanQuery.withDimFilter(scanQuery.getFilter().optimize()); + queryPlus = queryPlus.withQuery(scanQuery); + } + return runner.run(queryPlus, responseContext); + } + }; + } - @VisibleForTesting - Iterator heapsortScanResultValues(Iterator inputIterator, ScanQuery scanQuery) - { - Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); + @VisibleForTesting + Iterator heapsortScanResultValues(Iterator inputIterator, ScanQuery scanQuery) + { + Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); - // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch - // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) + // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch + // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) - PriorityQueue q = new PriorityQueue<>(Math.toIntExact(scanQuery.getLimit()), priorityQComparator); + PriorityQueue q = new PriorityQueue<>(Math.toIntExact(scanQuery.getLimit()), priorityQComparator); - while (inputIterator.hasNext()) { + while (inputIterator.hasNext()) { - ScanResultValue next = inputIterator.next(); - List events = (List) next.getEvents(); - for (Object event : events) { - // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list - // needs to be preserved for queries using the compactedList result format - q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event))); - } - } - // Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order - // will be maintained - List sortedElements = new ArrayList<>(q.size()); - while (q.size() != 0) { - sortedElements.add(q.poll()); - } - return sortedElements.iterator(); - } + ScanResultValue next = inputIterator.next(); + List events = (List) next.getEvents(); + for (Object event : events) { + // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list + // needs to be preserved for queries using the compactedList result format + q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event))); + } + } + // Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order + // will be maintained + List sortedElements = new ArrayList<>(q.size()); + while (q.size() != 0) { + sortedElements.add(q.poll()); + } + return sortedElements.iterator(); + } - private class ScanBatchedTimeOrderedQueueIterator implements CloseableIterator - { - private final Iterator itr; - private final int batchSize; + private class ScanBatchedTimeOrderedQueueIterator implements CloseableIterator + { + private final Iterator itr; + private final int batchSize; - public ScanBatchedTimeOrderedQueueIterator(Iterator iterator, int batchSize) - { - this.itr = iterator; - this.batchSize = batchSize; - } + public ScanBatchedTimeOrderedQueueIterator(Iterator iterator, int batchSize) + { + this.itr = iterator; + this.batchSize = batchSize; + } - @Override - public void close() throws IOException - { - } + @Override + public void close() throws IOException + { + } - @Override - public boolean hasNext() - { - return itr.hasNext(); - } + @Override + public boolean hasNext() + { + return itr.hasNext(); + } - @Override - public ScanResultValue next() - { - // Create new scanresultvalue from event map - List eventsToAdd = new ArrayList<>(batchSize); - List columns = new ArrayList<>(); - while (eventsToAdd.size() < batchSize && itr.hasNext()) { - ScanResultValue srv = itr.next(); - // Only replace once using the columns from the first event - columns = columns.isEmpty() ? srv.getColumns() : columns; - eventsToAdd.add(((List) srv.getEvents()).get(0)); - } - return new ScanResultValue(null, columns, eventsToAdd); - } - } + @Override + public ScanResultValue next() + { + // Create new scanresultvalue from event map + List eventsToAdd = new ArrayList<>(batchSize); + List columns = new ArrayList<>(); + while (eventsToAdd.size() < batchSize && itr.hasNext()) { + ScanResultValue srv = itr.next(); + // Only replace once using the columns from the first event + columns = columns.isEmpty() ? srv.getColumns() : columns; + eventsToAdd.add(((List) srv.getEvents()).get(0)); + } + return new ScanResultValue(null, columns, eventsToAdd); + } + } }