mirror of https://github.com/apache/druid.git
Change so batching only occurs on broker for time-ordered scans
Restricted batching to broker for time-ordered queries and adjusted tests Formatting Cleanup
This commit is contained in:
parent
451e2b4365
commit
18cce9a646
|
@ -107,6 +107,12 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This context flag corresponds to whether the query is running on a Broker or Historical. Specifically, this is
|
||||
* used to perform batching exclusively at the broker level for time-ordered scans.
|
||||
*/
|
||||
public static final String CTX_KEY_OUTERMOST = "scanOutermost";
|
||||
|
||||
private final VirtualColumns virtualColumns;
|
||||
private final ResultFormat resultFormat;
|
||||
private final int batchSize;
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.apache.druid.query.scan;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Yielder;
|
||||
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
|
||||
|
@ -27,15 +29,18 @@ import org.apache.druid.query.QueryPlus;
|
|||
import org.apache.druid.query.QueryRunner;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultValue>
|
||||
{
|
||||
private static final String TIME_ORDERING_SEGMENT_ID = "No segment ID available when using time ordering";
|
||||
private Yielder<ScanResultValue> yielder;
|
||||
private ScanQuery.ResultFormat resultFormat;
|
||||
private long limit;
|
||||
private long count = 0;
|
||||
private ScanQuery query;
|
||||
|
||||
public ScanQueryLimitRowIterator(
|
||||
QueryRunner<ScanResultValue> baseRunner,
|
||||
|
@ -43,11 +48,11 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
|
|||
Map<String, Object> responseContext
|
||||
)
|
||||
{
|
||||
ScanQuery query = (ScanQuery) queryPlus.getQuery();
|
||||
resultFormat = query.getResultFormat();
|
||||
limit = query.getLimit();
|
||||
this.query = (ScanQuery) queryPlus.getQuery();
|
||||
this.resultFormat = query.getResultFormat();
|
||||
this.limit = query.getLimit();
|
||||
Sequence<ScanResultValue> baseSequence = baseRunner.run(queryPlus, responseContext);
|
||||
yielder = baseSequence.toYielder(
|
||||
this.yielder = baseSequence.toYielder(
|
||||
null,
|
||||
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
|
||||
{
|
||||
|
@ -70,9 +75,14 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
|
|||
@Override
|
||||
public ScanResultValue next()
|
||||
{
|
||||
ScanResultValue batch = yielder.get();
|
||||
if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) ||
|
||||
ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
|
||||
if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
|
||||
throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
|
||||
}
|
||||
|
||||
// We don't want to perform batching at the historical-level if we're performing time ordering
|
||||
if (query.getTimeOrder() == ScanQuery.TimeOrder.NONE ||
|
||||
!query.getContextBoolean(ScanQuery.CTX_KEY_OUTERMOST, true)) {
|
||||
ScanResultValue batch = yielder.get();
|
||||
List events = (List) batch.getEvents();
|
||||
if (events.size() <= limit - count) {
|
||||
count += events.size();
|
||||
|
@ -85,8 +95,21 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
|
|||
count = limit;
|
||||
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left));
|
||||
}
|
||||
} else {
|
||||
// Perform single-event ScanResultValue batching. Each scan result value in this case will only have one event
|
||||
// so there's no need to iterate through events.
|
||||
int batchSize = query.getBatchSize();
|
||||
List<Object> eventsToAdd = new ArrayList<>(batchSize);
|
||||
List<String> columns = new ArrayList<>();
|
||||
while (eventsToAdd.size() < batchSize && !yielder.isDone()) {
|
||||
ScanResultValue srv = yielder.get();
|
||||
// Only replace once using the columns from the first event
|
||||
columns = columns.isEmpty() ? srv.getColumns() : columns;
|
||||
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
|
||||
yielder = yielder.next(null);
|
||||
}
|
||||
return new ScanResultValue(TIME_ORDERING_SEGMENT_ID, columns, eventsToAdd);
|
||||
}
|
||||
throw new UnsupportedOperationException(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -58,7 +58,8 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
|
|||
return (queryPlus, responseContext) -> {
|
||||
// Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it
|
||||
// the same way, even if they have different default legacy values.
|
||||
final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig);
|
||||
final ScanQuery scanQuery = ((ScanQuery) (queryPlus.getQuery()))
|
||||
.withNonNullLegacy(scanQueryConfig);
|
||||
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
|
||||
return new BaseSequence<>(
|
||||
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
|
||||
|
|
|
@ -20,20 +20,15 @@
|
|||
package org.apache.druid.query.scan;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.JodaUtils;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.java.util.common.guava.BaseSequence;
|
||||
import org.apache.druid.java.util.common.guava.CloseQuietly;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.java.util.common.guava.Yielder;
|
||||
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
|
||||
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.QueryPlus;
|
||||
|
@ -42,9 +37,7 @@ import org.apache.druid.query.QueryRunnerFactory;
|
|||
import org.apache.druid.query.QueryToolChest;
|
||||
import org.apache.druid.segment.Segment;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.Deque;
|
||||
import java.util.Iterator;
|
||||
|
@ -99,6 +92,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
// See the comment of CTX_TIMEOUT_AT.
|
||||
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
|
||||
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
|
||||
queryPlus.getQuery().getContext().put(ScanQuery.CTX_KEY_OUTERMOST, false);
|
||||
if (query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) {
|
||||
// Use normal strategy
|
||||
return Sequences.concat(
|
||||
|
@ -109,7 +103,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
);
|
||||
} else if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
|
||||
// Use priority queue strategy
|
||||
return sortBatchAndLimitScanResultValues(
|
||||
return sortAndLimitScanResultValues(
|
||||
Sequences.concat(Sequences.map(
|
||||
Sequences.simple(queryRunners),
|
||||
input -> input.run(queryPlus, responseContext)
|
||||
|
@ -136,7 +130,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
Math.toIntExact(query.getLimit())
|
||||
);
|
||||
|
||||
return new ScanResultValueBatchingSequence(unbatched, query.getBatchSize());
|
||||
return unbatched;
|
||||
} else if (query.getLimit() > scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
|
||||
throw new UOE(
|
||||
"Time ordering for query result set limit of %,d is not supported. Try lowering the result "
|
||||
|
@ -155,7 +149,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Sequence<ScanResultValue> sortBatchAndLimitScanResultValues(
|
||||
Sequence<ScanResultValue> sortAndLimitScanResultValues(
|
||||
Sequence<ScanResultValue> inputSequence,
|
||||
ScanQuery scanQuery
|
||||
)
|
||||
|
@ -200,25 +194,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
// addFirst is used since PriorityQueue#poll() dequeues the low-priority (timestamp-wise) events first.
|
||||
sortedElements.addFirst(q.poll());
|
||||
}
|
||||
|
||||
return new BaseSequence(
|
||||
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIterator>()
|
||||
{
|
||||
@Override
|
||||
public ScanBatchedIterator make()
|
||||
{
|
||||
return new ScanBatchedIterator(
|
||||
sortedElements.iterator(),
|
||||
scanQuery.getBatchSize()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup(ScanBatchedIterator iterFromMake)
|
||||
{
|
||||
CloseQuietly.close(iterFromMake);
|
||||
}
|
||||
});
|
||||
return Sequences.simple(sortedElements);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -254,137 +230,4 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
return engine.process((ScanQuery) query, segment, responseContext);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This iterator supports iteration through any Iterable of unbatched ScanResultValues (1 event/ScanResultValue) and
|
||||
* aggregates events into ScanResultValues with {@code batchSize} events. The columns from the first event per
|
||||
* ScanResultValue will be used to populate the column section.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class ScanBatchedIterator implements CloseableIterator<ScanResultValue>
|
||||
{
|
||||
private final Iterator<ScanResultValue> itr;
|
||||
private final int batchSize;
|
||||
|
||||
public ScanBatchedIterator(Iterator<ScanResultValue> iterator, int batchSize)
|
||||
{
|
||||
this.itr = iterator;
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return itr.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScanResultValue next()
|
||||
{
|
||||
ScanResultValue srv = itr.next();
|
||||
return srv;
|
||||
// Create new ScanResultValue from event map
|
||||
/*
|
||||
List<Object> eventsToAdd = new ArrayList<>(batchSize);
|
||||
List<String> columns = new ArrayList<>();
|
||||
while (eventsToAdd.size() < batchSize && itr.hasNext()) {
|
||||
ScanResultValue srv = itr.next();
|
||||
// Only replace once using the columns from the first event
|
||||
columns = columns.isEmpty() ? srv.getColumns() : columns;
|
||||
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
|
||||
}
|
||||
return new ScanResultValue(null, columns, eventsToAdd);
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static class ScanResultValueBatchingSequence extends YieldingSequenceBase<ScanResultValue>
|
||||
{
|
||||
Yielder<ScanResultValue> inputYielder;
|
||||
int batchSize;
|
||||
|
||||
public ScanResultValueBatchingSequence(Sequence<ScanResultValue> inputSequence, int batchSize)
|
||||
{
|
||||
this.inputYielder = inputSequence.toYielder(
|
||||
null,
|
||||
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
|
||||
{
|
||||
@Override
|
||||
public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
|
||||
{
|
||||
yield();
|
||||
return in;
|
||||
}
|
||||
}
|
||||
);
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <OutType> Yielder<OutType> toYielder(
|
||||
OutType initValue,
|
||||
YieldingAccumulator<OutType, ScanResultValue> accumulator
|
||||
)
|
||||
{
|
||||
return makeYielder(initValue, accumulator);
|
||||
}
|
||||
|
||||
private <OutType> Yielder<OutType> makeYielder(
|
||||
OutType initVal,
|
||||
final YieldingAccumulator<OutType, ScanResultValue> accumulator
|
||||
)
|
||||
{
|
||||
return new Yielder<OutType>()
|
||||
{
|
||||
@Override
|
||||
public OutType get()
|
||||
{
|
||||
ScanResultValue srv = inputYielder.get();
|
||||
inputYielder = inputYielder.next(null);
|
||||
return (OutType) srv;
|
||||
/*
|
||||
// Create new ScanResultValue from event map
|
||||
List<Object> eventsToAdd = new ArrayList<>(batchSize);
|
||||
List<String> columns = new ArrayList<>();
|
||||
while (eventsToAdd.size() < batchSize && !inputYielder.isDone()) {
|
||||
ScanResultValue srv = inputYielder.get();
|
||||
// Only replace once using the columns from the first event
|
||||
columns = columns.isEmpty() ? srv.getColumns() : columns;
|
||||
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
|
||||
inputYielder = inputYielder.next(null);
|
||||
}
|
||||
try {
|
||||
return (OutType) new ScanResultValue(null, columns, eventsToAdd);
|
||||
}
|
||||
catch (ClassCastException e) {
|
||||
return initVal;
|
||||
}*/
|
||||
}
|
||||
|
||||
@Override
|
||||
public Yielder<OutType> next(OutType initValue)
|
||||
{
|
||||
accumulator.reset();
|
||||
return makeYielder(initValue, accumulator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone()
|
||||
{
|
||||
return inputYielder.isDone();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.druid.query.scan;
|
|||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.java.util.common.guava.BaseSequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
|
||||
|
@ -38,8 +37,8 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
|
@ -73,6 +72,7 @@ public class ScanQueryRunnerFactoryTest
|
|||
.timeOrder(timeOrder)
|
||||
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
|
||||
.dataSource("some datasource")
|
||||
.resultFormat(resultFormat)
|
||||
.build();
|
||||
this.resultFormat = resultFormat;
|
||||
}
|
||||
|
@ -102,12 +102,12 @@ public class ScanQueryRunnerFactoryTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSortBatchAndLimitScanResultValues()
|
||||
public void testSortAndLimitScanResultValues()
|
||||
{
|
||||
List<ScanResultValue> srvs = new ArrayList<>(numElements);
|
||||
List<Long> expectedEventTimestamps = new ArrayList<>();
|
||||
for (int i = 0; i < numElements; i++) {
|
||||
long timestamp = (long) (Math.random() * Long.MAX_VALUE);
|
||||
long timestamp = (long) (ThreadLocalRandom.current().nextLong());
|
||||
expectedEventTimestamps.add(timestamp);
|
||||
srvs.add(generateOneEventScanResultValue(timestamp, resultFormat));
|
||||
}
|
||||
|
@ -125,39 +125,33 @@ public class ScanQueryRunnerFactoryTest
|
|||
});
|
||||
Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs);
|
||||
List<ScanResultValue> output =
|
||||
factory.sortBatchAndLimitScanResultValues(
|
||||
factory.sortAndLimitScanResultValues(
|
||||
inputSequence,
|
||||
query
|
||||
).toList();
|
||||
|
||||
// check numBatches is as expected
|
||||
int expectedNumBatches = (int) Math.ceil((double) numElements / query.getBatchSize());
|
||||
Assert.assertEquals(expectedNumBatches, output.size());
|
||||
|
||||
// check no batch has more than batchSize elements
|
||||
// check each scan result value has one event
|
||||
for (ScanResultValue srv : output) {
|
||||
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
|
||||
Assert.assertTrue(getEventsCompactedListResultFormat(srv).size() <= query.getBatchSize());
|
||||
Assert.assertTrue(getEventsCompactedListResultFormat(srv).size() == 1);
|
||||
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
|
||||
Assert.assertTrue(getEventsListResultFormat(srv).size() <= query.getBatchSize());
|
||||
Assert.assertTrue(getEventsListResultFormat(srv).size() == 1);
|
||||
}
|
||||
}
|
||||
|
||||
// check total # of rows <= limit
|
||||
int numRows = 0;
|
||||
for (ScanResultValue srv : output) {
|
||||
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
|
||||
numRows += getEventsCompactedListResultFormat(srv).size();
|
||||
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
|
||||
numRows += getEventsListResultFormat(srv).size();
|
||||
} else {
|
||||
throw new UOE("Invalid result format [%s] not supported", resultFormat.toString());
|
||||
}
|
||||
}
|
||||
Assert.assertTrue(numRows <= query.getLimit());
|
||||
Assert.assertTrue(output.size() <= query.getLimit());
|
||||
|
||||
// check ordering and values are correct
|
||||
|
||||
for (int i = 1; i < output.size(); i++) {
|
||||
if (query.getTimeOrder().equals(ScanQuery.TimeOrder.DESCENDING)) {
|
||||
Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) <
|
||||
output.get(i - 1).getFirstEventTimestamp(resultFormat));
|
||||
} else {
|
||||
Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) >
|
||||
output.get(i - 1).getFirstEventTimestamp(resultFormat));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ScanResultValue generateOneEventScanResultValue(long timestamp, ScanQuery.ResultFormat resultFormat)
|
||||
|
|
|
@ -197,7 +197,7 @@ public class ServerManager implements QuerySegmentWalker
|
|||
);
|
||||
|
||||
return CPUTimeMetricQueryRunner.safeBuild(
|
||||
new FinalizeResultsQueryRunner<T>(
|
||||
new FinalizeResultsQueryRunner<>(
|
||||
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
|
||||
toolChest
|
||||
),
|
||||
|
|
Loading…
Reference in New Issue