Change so batching only occurs on broker for time-ordered scans

Restricted batching to broker for time-ordered queries and adjusted
tests

Formatting

Cleanup
This commit is contained in:
Justin Borromeo 2019-02-26 13:16:44 -08:00
parent 451e2b4365
commit 18cce9a646
6 changed files with 64 additions and 197 deletions

View File

@ -107,6 +107,12 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
}
}
/**
* This context flag corresponds to whether the query is running on a Broker or Historical. Specifically, this is
* used to perform batching exclusively at the broker level for time-ordered scans.
*/
public static final String CTX_KEY_OUTERMOST = "scanOutermost";
private final VirtualColumns virtualColumns;
private final ResultFormat resultFormat;
private final int batchSize;

View File

@ -19,6 +19,8 @@
package org.apache.druid.query.scan;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
@ -27,15 +29,18 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultValue>
{
private static final String TIME_ORDERING_SEGMENT_ID = "No segment ID available when using time ordering";
private Yielder<ScanResultValue> yielder;
private ScanQuery.ResultFormat resultFormat;
private long limit;
private long count = 0;
private ScanQuery query;
public ScanQueryLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner,
@ -43,11 +48,11 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
Map<String, Object> responseContext
)
{
ScanQuery query = (ScanQuery) queryPlus.getQuery();
resultFormat = query.getResultFormat();
limit = query.getLimit();
this.query = (ScanQuery) queryPlus.getQuery();
this.resultFormat = query.getResultFormat();
this.limit = query.getLimit();
Sequence<ScanResultValue> baseSequence = baseRunner.run(queryPlus, responseContext);
yielder = baseSequence.toYielder(
this.yielder = baseSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@ -70,9 +75,14 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
@Override
public ScanResultValue next()
{
ScanResultValue batch = yielder.get();
if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) ||
ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
// We don't want to perform batching at the historical-level if we're performing time ordering
if (query.getTimeOrder() == ScanQuery.TimeOrder.NONE ||
!query.getContextBoolean(ScanQuery.CTX_KEY_OUTERMOST, true)) {
ScanResultValue batch = yielder.get();
List events = (List) batch.getEvents();
if (events.size() <= limit - count) {
count += events.size();
@ -85,8 +95,21 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
count = limit;
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left));
}
} else {
// Perform single-event ScanResultValue batching. Each scan result value in this case will only have one event
// so there's no need to iterate through events.
int batchSize = query.getBatchSize();
List<Object> eventsToAdd = new ArrayList<>(batchSize);
List<String> columns = new ArrayList<>();
while (eventsToAdd.size() < batchSize && !yielder.isDone()) {
ScanResultValue srv = yielder.get();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
yielder = yielder.next(null);
}
return new ScanResultValue(TIME_ORDERING_SEGMENT_ID, columns, eventsToAdd);
}
throw new UnsupportedOperationException(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
@Override

View File

@ -58,7 +58,8 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
return (queryPlus, responseContext) -> {
// Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it
// the same way, even if they have different default legacy values.
final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig);
final ScanQuery scanQuery = ((ScanQuery) (queryPlus.getQuery()))
.withNonNullLegacy(scanQueryConfig);
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()

View File

@ -20,20 +20,15 @@
package org.apache.druid.query.scan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
@ -42,9 +37,7 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.segment.Segment;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
@ -99,6 +92,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
queryPlus.getQuery().getContext().put(ScanQuery.CTX_KEY_OUTERMOST, false);
if (query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) {
// Use normal strategy
return Sequences.concat(
@ -109,7 +103,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
);
} else if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
// Use priority queue strategy
return sortBatchAndLimitScanResultValues(
return sortAndLimitScanResultValues(
Sequences.concat(Sequences.map(
Sequences.simple(queryRunners),
input -> input.run(queryPlus, responseContext)
@ -136,7 +130,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
Math.toIntExact(query.getLimit())
);
return new ScanResultValueBatchingSequence(unbatched, query.getBatchSize());
return unbatched;
} else if (query.getLimit() > scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
throw new UOE(
"Time ordering for query result set limit of %,d is not supported. Try lowering the result "
@ -155,7 +149,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
}
@VisibleForTesting
Sequence<ScanResultValue> sortBatchAndLimitScanResultValues(
Sequence<ScanResultValue> sortAndLimitScanResultValues(
Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery
)
@ -200,25 +194,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// addFirst is used since PriorityQueue#poll() dequeues the low-priority (timestamp-wise) events first.
sortedElements.addFirst(q.poll());
}
return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIterator>()
{
@Override
public ScanBatchedIterator make()
{
return new ScanBatchedIterator(
sortedElements.iterator(),
scanQuery.getBatchSize()
);
}
@Override
public void cleanup(ScanBatchedIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
});
return Sequences.simple(sortedElements);
}
@Override
@ -254,137 +230,4 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return engine.process((ScanQuery) query, segment, responseContext);
}
}
/**
* This iterator supports iteration through any Iterable of unbatched ScanResultValues (1 event/ScanResultValue) and
* aggregates events into ScanResultValues with {@code batchSize} events. The columns from the first event per
* ScanResultValue will be used to populate the column section.
*/
@VisibleForTesting
static class ScanBatchedIterator implements CloseableIterator<ScanResultValue>
{
private final Iterator<ScanResultValue> itr;
private final int batchSize;
public ScanBatchedIterator(Iterator<ScanResultValue> iterator, int batchSize)
{
this.itr = iterator;
this.batchSize = batchSize;
}
@Override
public void close() throws IOException
{
}
@Override
public boolean hasNext()
{
return itr.hasNext();
}
@Override
public ScanResultValue next()
{
ScanResultValue srv = itr.next();
return srv;
// Create new ScanResultValue from event map
/*
List<Object> eventsToAdd = new ArrayList<>(batchSize);
List<String> columns = new ArrayList<>();
while (eventsToAdd.size() < batchSize && itr.hasNext()) {
ScanResultValue srv = itr.next();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
}
return new ScanResultValue(null, columns, eventsToAdd);
*/
}
}
@VisibleForTesting
static class ScanResultValueBatchingSequence extends YieldingSequenceBase<ScanResultValue>
{
Yielder<ScanResultValue> inputYielder;
int batchSize;
public ScanResultValueBatchingSequence(Sequence<ScanResultValue> inputSequence, int batchSize)
{
this.inputYielder = inputSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@Override
public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
{
yield();
return in;
}
}
);
this.batchSize = batchSize;
}
@Override
public <OutType> Yielder<OutType> toYielder(
OutType initValue,
YieldingAccumulator<OutType, ScanResultValue> accumulator
)
{
return makeYielder(initValue, accumulator);
}
private <OutType> Yielder<OutType> makeYielder(
OutType initVal,
final YieldingAccumulator<OutType, ScanResultValue> accumulator
)
{
return new Yielder<OutType>()
{
@Override
public OutType get()
{
ScanResultValue srv = inputYielder.get();
inputYielder = inputYielder.next(null);
return (OutType) srv;
/*
// Create new ScanResultValue from event map
List<Object> eventsToAdd = new ArrayList<>(batchSize);
List<String> columns = new ArrayList<>();
while (eventsToAdd.size() < batchSize && !inputYielder.isDone()) {
ScanResultValue srv = inputYielder.get();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
inputYielder = inputYielder.next(null);
}
try {
return (OutType) new ScanResultValue(null, columns, eventsToAdd);
}
catch (ClassCastException e) {
return initVal;
}*/
}
@Override
public Yielder<OutType> next(OutType initValue)
{
accumulator.reset();
return makeYielder(initValue, accumulator);
}
@Override
public boolean isDone()
{
return inputYielder.isDone();
}
@Override
public void close()
{
}
};
}
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
@ -38,8 +37,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@RunWith(Parameterized.class)
@ -73,6 +72,7 @@ public class ScanQueryRunnerFactoryTest
.timeOrder(timeOrder)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.dataSource("some datasource")
.resultFormat(resultFormat)
.build();
this.resultFormat = resultFormat;
}
@ -102,12 +102,12 @@ public class ScanQueryRunnerFactoryTest
}
@Test
public void testSortBatchAndLimitScanResultValues()
public void testSortAndLimitScanResultValues()
{
List<ScanResultValue> srvs = new ArrayList<>(numElements);
List<Long> expectedEventTimestamps = new ArrayList<>();
for (int i = 0; i < numElements; i++) {
long timestamp = (long) (Math.random() * Long.MAX_VALUE);
long timestamp = (long) (ThreadLocalRandom.current().nextLong());
expectedEventTimestamps.add(timestamp);
srvs.add(generateOneEventScanResultValue(timestamp, resultFormat));
}
@ -125,39 +125,33 @@ public class ScanQueryRunnerFactoryTest
});
Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs);
List<ScanResultValue> output =
factory.sortBatchAndLimitScanResultValues(
factory.sortAndLimitScanResultValues(
inputSequence,
query
).toList();
// check numBatches is as expected
int expectedNumBatches = (int) Math.ceil((double) numElements / query.getBatchSize());
Assert.assertEquals(expectedNumBatches, output.size());
// check no batch has more than batchSize elements
// check each scan result value has one event
for (ScanResultValue srv : output) {
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
Assert.assertTrue(getEventsCompactedListResultFormat(srv).size() <= query.getBatchSize());
Assert.assertTrue(getEventsCompactedListResultFormat(srv).size() == 1);
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
Assert.assertTrue(getEventsListResultFormat(srv).size() <= query.getBatchSize());
Assert.assertTrue(getEventsListResultFormat(srv).size() == 1);
}
}
// check total # of rows <= limit
int numRows = 0;
for (ScanResultValue srv : output) {
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
numRows += getEventsCompactedListResultFormat(srv).size();
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
numRows += getEventsListResultFormat(srv).size();
} else {
throw new UOE("Invalid result format [%s] not supported", resultFormat.toString());
}
}
Assert.assertTrue(numRows <= query.getLimit());
Assert.assertTrue(output.size() <= query.getLimit());
// check ordering and values are correct
for (int i = 1; i < output.size(); i++) {
if (query.getTimeOrder().equals(ScanQuery.TimeOrder.DESCENDING)) {
Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) <
output.get(i - 1).getFirstEventTimestamp(resultFormat));
} else {
Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) >
output.get(i - 1).getFirstEventTimestamp(resultFormat));
}
}
}
private ScanResultValue generateOneEventScanResultValue(long timestamp, ScanQuery.ResultFormat resultFormat)

View File

@ -197,7 +197,7 @@ public class ServerManager implements QuerySegmentWalker
);
return CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<T>(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
),