This commit is contained in:
Justin Borromeo 2019-02-21 17:06:18 -08:00
parent f83e99655d
commit 1813a5472c
4 changed files with 8 additions and 181 deletions

View File

@ -41,7 +41,7 @@ public class ScanQueryConfig
}
@JsonProperty
private int maxRowsQueuedForTimeOrdering = 100000;
private int maxRowsQueuedForTimeOrdering = 1;
public int getMaxRowsQueuedForTimeOrdering()
{

View File

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import java.io.IOException;
import java.util.Map;
public class ScanQueryNoLimitRowIterator implements CloseableIterator<ScanResultValue>
{
private Yielder<ScanResultValue> yielder;
private ScanQuery.ResultFormat resultFormat;
public ScanQueryNoLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner,
QueryPlus<ScanResultValue> queryPlus,
Map<String, Object> responseContext
)
{
ScanQuery query = Druids.ScanQueryBuilder.copy((ScanQuery) queryPlus.getQuery()).limit(Long.MAX_VALUE).timeOrder(
ScanQuery.TimeOrder.NONE).build();
resultFormat = query.getResultFormat();
queryPlus = queryPlus.withQuery(query);
Sequence<ScanResultValue> baseSequence = baseRunner.run(queryPlus, responseContext);
yielder = baseSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@Override
public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
{
yield();
return in;
}
}
);
}
@Override
public boolean hasNext()
{
return !yielder.isDone();
}
@Override
public ScanResultValue next()
{
ScanResultValue batch = yielder.get();
if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) ||
ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
yielder = yielder.next(null);
return batch;
}
throw new UnsupportedOperationException(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
yielder.close();
}
}

View File

@ -121,94 +121,13 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
@Override
public QueryRunner<ScanResultValue> preMergeQueryDecoration(final QueryRunner<ScanResultValue> runner)
{
return new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext)
{
ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
if (scanQuery.getFilter() != null) {
scanQuery = scanQuery.withDimFilter(scanQuery.getFilter().optimize());
queryPlus = queryPlus.withQuery(scanQuery);
}
return runner.run(queryPlus, responseContext);
return (queryPlus, responseContext) -> {
ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
if (scanQuery.getFilter() != null) {
scanQuery = scanQuery.withDimFilter(scanQuery.getFilter().optimize());
queryPlus = queryPlus.withQuery(scanQuery);
}
return runner.run(queryPlus, responseContext);
};
}
@VisibleForTesting
Iterator<ScanResultValue> sortAndLimitScanResultValues(Iterator<ScanResultValue> inputIterator, ScanQuery scanQuery)
{
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
int limit = Math.toIntExact(scanQuery.getLimit());
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator);
while (inputIterator.hasNext()) {
ScanResultValue next = inputIterator.next();
List<Object> events = (List<Object>) next.getEvents();
for (Object event : events) {
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
// needs to be preserved for queries using the compactedList result format
q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event)));
if (q.size() > limit) {
q.poll();
}
}
}
// Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order
// will be maintained
final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size());
while (q.size() != 0) {
// We add at the front of the list because poll removes the tail of the queue.
sortedElements.addFirst(q.poll());
}
return sortedElements.iterator();
}
/**
* This iterator supports iteration through any Iterable of unbatched ScanResultValues (1 event/ScanResultValue) and
* aggregates events into ScanResultValues with {@code batchSize} events. The columns from the first event per
* ScanResultValue will be used to populate the column section.
*/
private static class ScanBatchedIterator implements CloseableIterator<ScanResultValue>
{
private final Iterator<ScanResultValue> itr;
private final int batchSize;
public ScanBatchedIterator(Iterator<ScanResultValue> iterator, int batchSize)
{
this.itr = iterator;
this.batchSize = batchSize;
}
@Override
public void close() throws IOException
{
}
@Override
public boolean hasNext()
{
return itr.hasNext();
}
@Override
public ScanResultValue next()
{
// Create new ScanResultValue from event map
List<Object> eventsToAdd = new ArrayList<>(batchSize);
List<String> columns = new ArrayList<>();
while (eventsToAdd.size() < batchSize && itr.hasNext()) {
ScanResultValue srv = itr.next();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
}
return new ScanResultValue(null, columns, eventsToAdd);
}
}
}

View File

@ -116,7 +116,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
));
return sortBatchAndLimitScanResultValues(queryResults, query);
} else if (numSegments <= scanQueryConfig.getMaxSegmentsTimeOrderedInMemory()) {
// Use flatMerge strategy
// Use n-way merge strategy
return Sequences.map(
Sequences.simple(queryRunners),
(input) -> Sequences.concat(