Optimized n-way merge strategy

This commit is contained in:
Justin Borromeo 2019-03-21 13:16:58 -07:00
parent 42f5246b8d
commit 43d490cc3a
3 changed files with 84 additions and 48 deletions

View File

@ -123,7 +123,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
ScanResultValue srv = yielder.get();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
eventsToAdd.add(Iterables.getOnlyElement((List<Object>) srv.getEvents()));
yielder = yielder.next(null);
count++;
}

View File

@ -20,11 +20,13 @@
package org.apache.druid.query.scan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -38,18 +40,19 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
{
@ -88,11 +91,16 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// in single thread and in jetty thread instead of processing thread
return (queryPlus, responseContext) -> {
ScanQuery query = (ScanQuery) queryPlus.getQuery();
int numSegments = 0;
final Iterator<QueryRunner<ScanResultValue>> segmentIt = queryRunners.iterator();
for (; segmentIt.hasNext(); numSegments++) {
segmentIt.next();
List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
@ -113,16 +121,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
}
} else if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) {
// Use priority queue strategy
List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
return sortAndLimitScanResultValues(
return sortAndLimitScanResultValuesPriorityQueue(
Sequences.concat(Sequences.map(
Sequences.simple(queryRunnersOrdered),
input -> input.run(queryPlus, responseContext)
@ -130,42 +129,79 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
query,
descriptorsOrdered
);
} else if (numSegments <= scanQueryConfig.getMaxSegmentsOrderedInMemory()) {
// Use n-way merge strategy
final Sequence<ScanResultValue> unbatched =
Sequences.map(
Sequences.simple(queryRunners),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
query
)).reverse()
).limit(
Math.toIntExact(query.getLimit())
);
} else {
Preconditions.checkState(
descriptorsOrdered.size() == queryRunnersOrdered.size(),
"Number of segment descriptors does not equal number of "
+ "query runners...something went wrong!"
);
return unbatched;
List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
}
LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
descriptorsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy(
x -> x.lhs.getInterval(),
LinkedHashMap::new,
Collectors.toList()
));
int maxNumPartitionsInSegment =
partitionsGroupedByInterval.values()
.stream()
.map(x -> x.size())
.max(Comparator.comparing(Integer::valueOf)).get();
if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentsOrderedInMemory()) {
// Use n-way merge strategy
List<List<QueryRunner<ScanResultValue>>> groupedRunners = new ArrayList<>(descriptorsAndRunnersOrdered.size());
for (Map.Entry<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> entry :
partitionsGroupedByInterval.entrySet()) {
groupedRunners.add(entry.getValue().stream().map(x -> x.rhs).collect(Collectors.toList()));
}
return Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners), // Sequence of runnerGroups
runnerGroup ->
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
query
)).reverse()
)
)
).limit(
Math.toIntExact(query.getLimit())
);
}
throw new UOE(
"Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported."
+ " Try reducing the scope of the query to scan fewer partitions than the configurable limit of"
+ " %,d partitions or lower the row limit below %,d.",
maxNumPartitionsInSegment,
query.getLimit(),
scanQueryConfig.getMaxSegmentsOrderedInMemory(),
scanQueryConfig.getMaxRowsQueuedForOrdering()
);
}
throw new UOE(
"Time ordering for queries of %,d segments per historical and a row limit of %,d is not supported."
+ " Try reducing the scope of the query to scan fewer segments than the configurable segment limit of"
+ " %,d segments or lower the row limit below %,d.",
numSegments,
query.getLimit(),
scanQueryConfig.getMaxSegmentsOrderedInMemory(),
scanQueryConfig.getMaxRowsQueuedForOrdering()
);
};
}
@VisibleForTesting
Sequence<ScanResultValue> sortAndLimitScanResultValues(
Sequence<ScanResultValue> sortAndLimitScanResultValuesPriorityQueue(
Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery,
List<SegmentDescriptor> descriptorsOrdered

View File

@ -119,7 +119,7 @@ public class ScanQueryRunnerFactoryTest
});
Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs);
List<ScanResultValue> output =
factory.sortAndLimitScanResultValues(
factory.sortAndLimitScanResultValuesPriorityQueue(
inputSequence,
query,
null