mirror of https://github.com/apache/druid.git
Created an error message for when someone tries to time order a result
set > threshold limit
This commit is contained in:
parent
e8a4b49044
commit
7e872a8ebc
|
@ -48,186 +48,193 @@ import java.util.PriorityQueue;
|
||||||
|
|
||||||
public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, ScanQuery>
|
public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, ScanQuery>
|
||||||
{
|
{
|
||||||
private static final TypeReference<ScanResultValue> TYPE_REFERENCE = new TypeReference<ScanResultValue>()
|
private static final TypeReference<ScanResultValue> TYPE_REFERENCE = new TypeReference<ScanResultValue>()
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
|
|
||||||
private final ScanQueryConfig scanQueryConfig;
|
private final ScanQueryConfig scanQueryConfig;
|
||||||
private final GenericQueryMetricsFactory queryMetricsFactory;
|
private final GenericQueryMetricsFactory queryMetricsFactory;
|
||||||
private final long maxRowsForInMemoryTimeOrdering;
|
private final long maxRowsForInMemoryTimeOrdering;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ScanQueryQueryToolChest(
|
public ScanQueryQueryToolChest(
|
||||||
final ScanQueryConfig scanQueryConfig,
|
final ScanQueryConfig scanQueryConfig,
|
||||||
final GenericQueryMetricsFactory queryMetricsFactory
|
final GenericQueryMetricsFactory queryMetricsFactory
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.scanQueryConfig = scanQueryConfig;
|
this.scanQueryConfig = scanQueryConfig;
|
||||||
this.queryMetricsFactory = queryMetricsFactory;
|
this.queryMetricsFactory = queryMetricsFactory;
|
||||||
this.maxRowsForInMemoryTimeOrdering = scanQueryConfig.getMaxRowsTimeOrderedInMemory();
|
this.maxRowsForInMemoryTimeOrdering = scanQueryConfig.getMaxRowsTimeOrderedInMemory();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
|
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
|
||||||
{
|
{
|
||||||
return (queryPlus, responseContext) -> {
|
return (queryPlus, responseContext) -> {
|
||||||
// Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it
|
// Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it
|
||||||
// the same way, even if they have different default legacy values.
|
// the same way, even if they have different default legacy values.
|
||||||
final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig);
|
final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig);
|
||||||
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
|
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
|
||||||
final BaseSequence.IteratorMaker scanQueryLimitRowIteratorMaker =
|
final BaseSequence.IteratorMaker scanQueryLimitRowIteratorMaker =
|
||||||
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
|
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public ScanQueryLimitRowIterator make()
|
public ScanQueryLimitRowIterator make()
|
||||||
{
|
{
|
||||||
return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext);
|
return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanup(ScanQueryLimitRowIterator iterFromMake)
|
public void cleanup(ScanQueryLimitRowIterator iterFromMake)
|
||||||
{
|
{
|
||||||
CloseQuietly.close(iterFromMake);
|
CloseQuietly.close(iterFromMake);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE) ||
|
if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE)) {
|
||||||
scanQuery.getLimit() > maxRowsForInMemoryTimeOrdering) {
|
if (scanQuery.getLimit() == Long.MAX_VALUE) {
|
||||||
if (scanQuery.getLimit() == Long.MAX_VALUE) {
|
return runner.run(queryPlusWithNonNullLegacy, responseContext);
|
||||||
return runner.run(queryPlusWithNonNullLegacy, responseContext);
|
}
|
||||||
}
|
return new BaseSequence<ScanResultValue, ScanQueryLimitRowIterator>(scanQueryLimitRowIteratorMaker);
|
||||||
return new BaseSequence<ScanResultValue, ScanQueryLimitRowIterator>(scanQueryLimitRowIteratorMaker);
|
} else if ((scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) ||
|
||||||
} else if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) ||
|
scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING))
|
||||||
scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) {
|
&& scanQuery.getLimit() <= scanQueryConfig.getMaxRowsTimeOrderedInMemory()) {
|
||||||
Iterator<ScanResultValue> scanResultIterator = scanQueryLimitRowIteratorMaker.make();
|
Iterator<ScanResultValue> scanResultIterator = scanQueryLimitRowIteratorMaker.make();
|
||||||
|
|
||||||
return new BaseSequence(
|
return new BaseSequence(
|
||||||
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedTimeOrderedQueueIterator>()
|
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedTimeOrderedQueueIterator>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public ScanBatchedTimeOrderedQueueIterator make()
|
public ScanBatchedTimeOrderedQueueIterator make()
|
||||||
{
|
{
|
||||||
return new ScanBatchedTimeOrderedQueueIterator(
|
return new ScanBatchedTimeOrderedQueueIterator(
|
||||||
heapsortScanResultValues(scanResultIterator, scanQuery),
|
heapsortScanResultValues(scanResultIterator, scanQuery),
|
||||||
scanQuery.getBatchSize()
|
scanQuery.getBatchSize()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanup(ScanBatchedTimeOrderedQueueIterator iterFromMake)
|
public void cleanup(ScanBatchedTimeOrderedQueueIterator iterFromMake)
|
||||||
{
|
{
|
||||||
CloseQuietly.close(iterFromMake);
|
CloseQuietly.close(iterFromMake);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else if (scanQuery.getLimit() > scanQueryConfig.getMaxRowsTimeOrderedInMemory()) {
|
||||||
throw new UOE("Time ordering [%s] is not supported", scanQuery.getTimeOrder());
|
throw new UOE(
|
||||||
}
|
"Time ordering for result set limit of %s is not supported. Try lowering the "
|
||||||
};
|
+ "result set size to less than or equal to the time ordering limit of %s.",
|
||||||
}
|
scanQuery.getLimit(),
|
||||||
|
scanQueryConfig.getMaxRowsTimeOrderedInMemory()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
throw new UOE("Time ordering [%s] is not supported", scanQuery.getTimeOrder());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueryMetrics<Query<?>> makeMetrics(ScanQuery query)
|
public QueryMetrics<Query<?>> makeMetrics(ScanQuery query)
|
||||||
{
|
{
|
||||||
return queryMetricsFactory.makeMetrics(query);
|
return queryMetricsFactory.makeMetrics(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Function<ScanResultValue, ScanResultValue> makePreComputeManipulatorFn(
|
public Function<ScanResultValue, ScanResultValue> makePreComputeManipulatorFn(
|
||||||
ScanQuery query,
|
ScanQuery query,
|
||||||
MetricManipulationFn fn
|
MetricManipulationFn fn
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return Functions.identity();
|
return Functions.identity();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TypeReference<ScanResultValue> getResultTypeReference()
|
public TypeReference<ScanResultValue> getResultTypeReference()
|
||||||
{
|
{
|
||||||
return TYPE_REFERENCE;
|
return TYPE_REFERENCE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueryRunner<ScanResultValue> preMergeQueryDecoration(final QueryRunner<ScanResultValue> runner)
|
public QueryRunner<ScanResultValue> preMergeQueryDecoration(final QueryRunner<ScanResultValue> runner)
|
||||||
{
|
{
|
||||||
return new QueryRunner<ScanResultValue>()
|
return new QueryRunner<ScanResultValue>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext)
|
public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext)
|
||||||
{
|
{
|
||||||
ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
|
ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
|
||||||
if (scanQuery.getFilter() != null) {
|
if (scanQuery.getFilter() != null) {
|
||||||
scanQuery = scanQuery.withDimFilter(scanQuery.getFilter().optimize());
|
scanQuery = scanQuery.withDimFilter(scanQuery.getFilter().optimize());
|
||||||
queryPlus = queryPlus.withQuery(scanQuery);
|
queryPlus = queryPlus.withQuery(scanQuery);
|
||||||
}
|
}
|
||||||
return runner.run(queryPlus, responseContext);
|
return runner.run(queryPlus, responseContext);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Iterator<ScanResultValue> heapsortScanResultValues(Iterator<ScanResultValue> inputIterator, ScanQuery scanQuery)
|
Iterator<ScanResultValue> heapsortScanResultValues(Iterator<ScanResultValue> inputIterator, ScanQuery scanQuery)
|
||||||
{
|
{
|
||||||
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
|
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
|
||||||
|
|
||||||
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
|
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
|
||||||
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
|
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
|
||||||
|
|
||||||
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(Math.toIntExact(scanQuery.getLimit()), priorityQComparator);
|
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(Math.toIntExact(scanQuery.getLimit()), priorityQComparator);
|
||||||
|
|
||||||
while (inputIterator.hasNext()) {
|
while (inputIterator.hasNext()) {
|
||||||
|
|
||||||
ScanResultValue next = inputIterator.next();
|
ScanResultValue next = inputIterator.next();
|
||||||
List<Object> events = (List<Object>) next.getEvents();
|
List<Object> events = (List<Object>) next.getEvents();
|
||||||
for (Object event : events) {
|
for (Object event : events) {
|
||||||
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
|
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
|
||||||
// needs to be preserved for queries using the compactedList result format
|
// needs to be preserved for queries using the compactedList result format
|
||||||
q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event)));
|
q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order
|
// Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order
|
||||||
// will be maintained
|
// will be maintained
|
||||||
List<ScanResultValue> sortedElements = new ArrayList<>(q.size());
|
List<ScanResultValue> sortedElements = new ArrayList<>(q.size());
|
||||||
while (q.size() != 0) {
|
while (q.size() != 0) {
|
||||||
sortedElements.add(q.poll());
|
sortedElements.add(q.poll());
|
||||||
}
|
}
|
||||||
return sortedElements.iterator();
|
return sortedElements.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ScanBatchedTimeOrderedQueueIterator implements CloseableIterator<ScanResultValue>
|
private class ScanBatchedTimeOrderedQueueIterator implements CloseableIterator<ScanResultValue>
|
||||||
{
|
{
|
||||||
private final Iterator<ScanResultValue> itr;
|
private final Iterator<ScanResultValue> itr;
|
||||||
private final int batchSize;
|
private final int batchSize;
|
||||||
|
|
||||||
public ScanBatchedTimeOrderedQueueIterator(Iterator<ScanResultValue> iterator, int batchSize)
|
public ScanBatchedTimeOrderedQueueIterator(Iterator<ScanResultValue> iterator, int batchSize)
|
||||||
{
|
{
|
||||||
this.itr = iterator;
|
this.itr = iterator;
|
||||||
this.batchSize = batchSize;
|
this.batchSize = batchSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException
|
public void close() throws IOException
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext()
|
public boolean hasNext()
|
||||||
{
|
{
|
||||||
return itr.hasNext();
|
return itr.hasNext();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ScanResultValue next()
|
public ScanResultValue next()
|
||||||
{
|
{
|
||||||
// Create new scanresultvalue from event map
|
// Create new scanresultvalue from event map
|
||||||
List<Object> eventsToAdd = new ArrayList<>(batchSize);
|
List<Object> eventsToAdd = new ArrayList<>(batchSize);
|
||||||
List<String> columns = new ArrayList<>();
|
List<String> columns = new ArrayList<>();
|
||||||
while (eventsToAdd.size() < batchSize && itr.hasNext()) {
|
while (eventsToAdd.size() < batchSize && itr.hasNext()) {
|
||||||
ScanResultValue srv = itr.next();
|
ScanResultValue srv = itr.next();
|
||||||
// Only replace once using the columns from the first event
|
// Only replace once using the columns from the first event
|
||||||
columns = columns.isEmpty() ? srv.getColumns() : columns;
|
columns = columns.isEmpty() ? srv.getColumns() : columns;
|
||||||
eventsToAdd.add(((List) srv.getEvents()).get(0));
|
eventsToAdd.add(((List) srv.getEvents()).get(0));
|
||||||
}
|
}
|
||||||
return new ScanResultValue(null, columns, eventsToAdd);
|
return new ScanResultValue(null, columns, eventsToAdd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue