Refactor and pQueue works

This commit is contained in:
Justin Borromeo 2019-02-21 16:56:36 -08:00
parent b13ff624a9
commit f83e99655d
8 changed files with 187 additions and 67 deletions

View File

@ -68,7 +68,7 @@ public class ScanQueryEngine
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) {
long count = (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
if (count >= query.getLimit()) {
if (count >= query.getLimit() && query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) {
return Sequences.empty();
}
}
@ -123,7 +123,9 @@ public class ScanQueryEngine
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) {
responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L);
}
final long limit = query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
final long limit = query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE) ?
query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) :
Long.MAX_VALUE;
return Sequences.concat(
adapter
.makeCursors(

View File

@ -76,69 +76,24 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
// the same way, even if they have different default legacy values.
final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig);
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) {
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(queryPlusWithNonNullLegacy, responseContext);
} else {
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
{
@Override
public ScanQueryLimitRowIterator make()
{
return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext);
}
@Override
public void cleanup(ScanQueryLimitRowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
});
}
} else if (scanQuery.getLimit() <= scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
ScanQueryNoLimitRowIterator scanResultIterator =
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryNoLimitRowIterator>()
if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.NONE) && scanQuery.getLimit() != Long.MAX_VALUE) {
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
{
@Override
public ScanQueryNoLimitRowIterator make()
public ScanQueryLimitRowIterator make()
{
return new ScanQueryNoLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext);
return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext);
}
@Override
public void cleanup(ScanQueryNoLimitRowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
}.make();
return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIterator>()
{
@Override
public ScanBatchedIterator make()
{
return new ScanBatchedIterator(
sortAndLimitScanResultValues(scanResultIterator, scanQuery),
scanQuery.getBatchSize()
);
}
@Override
public void cleanup(ScanBatchedIterator iterFromMake)
public void cleanup(ScanQueryLimitRowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
});
} else {
throw new UOE(
"Time ordering for result set limit of %,d is not supported. Try lowering the "
+ "result set size to less than or equal to the time ordering limit of %,d.",
scanQuery.getLimit(),
scanQueryConfig.getMaxRowsQueuedForTimeOrdering()
);
}
return runner.run(queryPlusWithNonNullLegacy, responseContext);
};
}

View File

@ -19,13 +19,22 @@
package org.apache.druid.query.scan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
@ -34,8 +43,15 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.segment.Segment;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
@ -85,11 +101,38 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
if (query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) {
// Use existing strategy
// Use normal strategy
return Sequences.concat(
Sequences.map(
Sequences.simple(queryRunners),
input -> input.run(queryPlus, responseContext)
)
);
} else if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
// Use priority queue strategy
Sequence<ScanResultValue> queryResults = Sequences.concat(Sequences.map(
Sequences.simple(queryRunners),
input -> input.run(queryPlus, responseContext)
));
return sortBatchAndLimitScanResultValues(queryResults, query);
} else if (numSegments <= scanQueryConfig.getMaxSegmentsTimeOrderedInMemory()) {
// Use flatMerge strategy
return Sequences.map(
Sequences.simple(queryRunners),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
query
))
).limit(
Math.toIntExact(query.getLimit())
);
} else {
throw new UOE(
"Time ordering for result set limit of %,d is not supported. Try lowering the "
@ -98,17 +141,78 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
scanQueryConfig.getMaxRowsQueuedForTimeOrdering()
);
}
return Sequences.concat(
Sequences.map(
Sequences.simple(queryRunners),
input -> input.run(queryPlus, responseContext)
)
);
};
}
@VisibleForTesting
Sequence<ScanResultValue> sortBatchAndLimitScanResultValues(
Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery
)
{
Comparator<ScanResultValue> priorityQComparator =
new ScanResultValueTimestampComparator(scanQuery);
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
int limit = Math.toIntExact(scanQuery.getLimit());
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator);
Yielder<ScanResultValue> yielder = inputSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@Override
public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
{
yield();
return in;
}
}
);
while (!yielder.isDone()) {
ScanResultValue next = yielder.get();
List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
for (ScanResultValue srv : singleEventScanResultValues) {
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
// needs to be preserved for queries using the compactedList result format
q.offer(srv);
if (q.size() > limit) {
q.poll();
}
}
yielder = yielder.next(null);
}
// Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order
// will be maintained
final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size());
while (q.size() != 0) {
// We add at the front of the list because poll removes the tail of the queue.
sortedElements.addFirst(q.poll());
}
// We can use an iterator here because all the results have been materialized for sorting
return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIterator>()
{
@Override
public ScanBatchedIterator make()
{
return new ScanBatchedIterator(
sortedElements.iterator(),
scanQuery.getBatchSize()
);
}
@Override
public void cleanup(ScanBatchedIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
});
}
@Override
public QueryToolChest<ScanResultValue, ScanQuery> getToolchest()
{
@ -142,4 +246,46 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return engine.process((ScanQuery) query, segment, responseContext);
}
}
/**
* This iterator supports iteration through any Iterable of unbatched ScanResultValues (1 event/ScanResultValue) and
* aggregates events into ScanResultValues with {@code batchSize} events. The columns from the first event per
* ScanResultValue will be used to populate the column section.
*/
private static class ScanBatchedIterator implements CloseableIterator<ScanResultValue>
{
private final Iterator<ScanResultValue> itr;
private final int batchSize;
public ScanBatchedIterator(Iterator<ScanResultValue> iterator, int batchSize)
{
this.itr = iterator;
this.batchSize = batchSize;
}
@Override
public void close() throws IOException
{
}
@Override
public boolean hasNext()
{
return itr.hasNext();
}
@Override
public ScanResultValue next()
{
// Create new ScanResultValue from event map
List<Object> eventsToAdd = new ArrayList<>(batchSize);
List<String> columns = new ArrayList<>();
while (eventsToAdd.size() < batchSize && itr.hasNext()) {
ScanResultValue srv = itr.next();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
}
return new ScanResultValue(null, columns, eventsToAdd);
}
}
}

View File

@ -25,6 +25,8 @@ import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -85,6 +87,17 @@ public class ScanResultValue implements Comparable<ScanResultValue>
throw new UOE("Unable to get first event timestamp using result format of [%s]", query.getResultFormat());
}
public List<ScanResultValue> toSingleEventScanResultValues()
{
List<ScanResultValue> singleEventScanResultValues = new ArrayList<>();
List<Object> events = (List<Object>) this.getEvents();
for (Object event : events) {
singleEventScanResultValues.add(new ScanResultValue(segmentId, columns, Collections.singletonList(event)));
}
return singleEventScanResultValues;
}
@Override
public boolean equals(Object o)
{

View File

@ -94,7 +94,8 @@ public class DoubleStorageTest
private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new ScanQueryRunnerFactory(
scanQueryQueryToolChest,
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
);
private Druids.ScanQueryBuilder newTestQuery()

View File

@ -73,7 +73,8 @@ public class MultiSegmentScanQueryTest
private static final QueryRunnerFactory<ScanResultValue, ScanQuery> factory = new ScanQueryRunnerFactory(
toolChest,
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
);
// time modified version of druid.sample.numeric.tsv

View File

@ -122,7 +122,8 @@ public class ScanQueryRunnerTest
QueryRunnerTestHelper.makeQueryRunners(
new ScanQueryRunnerFactory(
toolChest,
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
)
),
ImmutableList.of(false, true)

View File

@ -509,7 +509,8 @@ public class CalciteTests
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
),
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
.put(