Added time ordering to the scan benchmark

This commit is contained in:
Justin Borromeo 2019-02-04 14:36:18 -08:00
parent 12e51a2721
commit 01b25ed112
2 changed files with 66 additions and 70 deletions

View File

@ -91,7 +91,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/* Works with 4GB heap size or greater. Otherwise there's a good chance of an OOME. */ /* Works with 8GB heap size or greater. Otherwise there's a good chance of an OOME. */
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(value = 1) @Fork(value = 1)
@Warmup(iterations = 10) @Warmup(iterations = 10)
@ -104,15 +104,18 @@ public class ScanBenchmark
@Param({"1", "2"}) @Param({"1", "2"})
private int numProcessingThreads; private int numProcessingThreads;
@Param({"250000"}) @Param({"750000"})
private int rowsPerSegment; private int rowsPerSegment;
@Param({"basic.A"}) @Param({"basic.A"})
private String schemaAndQuery; private String schemaAndQuery;
@Param({"1000"}) @Param({"1000", "99999"})
private int limit; private int limit;
@Param({"none", "descending", "ascending"})
private static String timeOrdering;
private static final Logger log = new Logger(ScanBenchmark.class); private static final Logger log = new Logger(ScanBenchmark.class);
private static final ObjectMapper JSON_MAPPER; private static final ObjectMapper JSON_MAPPER;
private static final IndexMergerV9 INDEX_MERGER_V9; private static final IndexMergerV9 INDEX_MERGER_V9;
@ -178,7 +181,7 @@ public class ScanBenchmark
return Druids.newScanQueryBuilder() return Druids.newScanQueryBuilder()
.dataSource("blah") .dataSource("blah")
.intervals(intervalSpec) .intervals(intervalSpec)
.timeOrder("none"); .timeOrder(timeOrdering);
} }
@ -205,7 +208,7 @@ public class ScanBenchmark
return Druids.newScanQueryBuilder() return Druids.newScanQueryBuilder()
.filters(filter) .filters(filter)
.intervals(intervalSpec) .intervals(intervalSpec)
.timeOrder("none"); .timeOrder(timeOrdering);
} }
private static Druids.ScanQueryBuilder basicC(final BenchmarkSchemaInfo basicSchema) private static Druids.ScanQueryBuilder basicC(final BenchmarkSchemaInfo basicSchema)
@ -223,7 +226,7 @@ public class ScanBenchmark
return Druids.newScanQueryBuilder() return Druids.newScanQueryBuilder()
.filters(new SelectorDimFilter(dimName, "3", StrlenExtractionFn.instance())) .filters(new SelectorDimFilter(dimName, "3", StrlenExtractionFn.instance()))
.intervals(intervalSpec) .intervals(intervalSpec)
.timeOrder("none"); .timeOrder(timeOrdering);
} }
private static Druids.ScanQueryBuilder basicD(final BenchmarkSchemaInfo basicSchema) private static Druids.ScanQueryBuilder basicD(final BenchmarkSchemaInfo basicSchema)
@ -245,7 +248,7 @@ public class ScanBenchmark
return Druids.newScanQueryBuilder() return Druids.newScanQueryBuilder()
.filters(new BoundDimFilter(dimName, "100", "10000", true, true, true, null, null)) .filters(new BoundDimFilter(dimName, "100", "10000", true, true, true, null, null))
.intervals(intervalSpec) .intervals(intervalSpec)
.timeOrder("none"); .timeOrder(timeOrdering);
} }
@Setup @Setup

View File

@ -69,78 +69,71 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
@Override @Override
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner) public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
{ {
return new QueryRunner<ScanResultValue>() return (queryPlus, responseContext) -> {
{ // Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it
@Override // the same way, even if they have different default legacy values.
public Sequence<ScanResultValue> run( final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig);
final QueryPlus<ScanResultValue> queryPlus, final Map<String, Object> responseContext final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
) BaseSequence.IteratorMaker scanQueryLimitRowIteratorMaker =
{ new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
// Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it {
// the same way, even if they have different default legacy values. @Override
final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig); public ScanQueryLimitRowIterator make()
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); {
BaseSequence.IteratorMaker scanQueryLimitRowIteratorMaker = return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext);
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>() }
@Override
public void cleanup(ScanQueryLimitRowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
};
if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE) ||
scanQuery.getLimit() > maxRowsForInMemoryTimeOrdering) {
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(queryPlusWithNonNullLegacy, responseContext);
}
return new BaseSequence<>(scanQueryLimitRowIteratorMaker);
} else if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) ||
scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) {
Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
PriorityQueue q = new PriorityQueue<>(Math.toIntExact(scanQuery.getLimit()), priorityQComparator);
Iterator<ScanResultValue> scanResultIterator = scanQueryLimitRowIteratorMaker.make();
while (scanResultIterator.hasNext()) {
ScanResultValue next = scanResultIterator.next();
List<Object> events = (List<Object>) next.getEvents();
for (Object event : events) {
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
// needs to be preserved for queries using the compactedList result format
q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event)));
}
}
Iterator queueIterator = q.iterator();
return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedTimeOrderedQueueIterator>()
{ {
@Override @Override
public ScanQueryLimitRowIterator make() public ScanBatchedTimeOrderedQueueIterator make()
{ {
return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); return new ScanBatchedTimeOrderedQueueIterator(queueIterator, scanQuery.getBatchSize());
} }
@Override @Override
public void cleanup(ScanQueryLimitRowIterator iterFromMake) public void cleanup(ScanBatchedTimeOrderedQueueIterator iterFromMake)
{ {
CloseQuietly.close(iterFromMake); CloseQuietly.close(iterFromMake);
} }
}; });
} else {
if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE) || throw new UOE("Time ordering [%s] is not supported", scanQuery.getTimeOrder());
scanQuery.getLimit() > maxRowsForInMemoryTimeOrdering) {
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(queryPlusWithNonNullLegacy, responseContext);
}
return new BaseSequence<>(scanQueryLimitRowIteratorMaker);
} else if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) ||
scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) {
Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
PriorityQueue<Object> q = new PriorityQueue<>(Math.toIntExact(scanQuery.getLimit()), priorityQComparator);
Iterator<ScanResultValue> scanResultIterator = scanQueryLimitRowIteratorMaker.make();
while (scanResultIterator.hasNext()) {
ScanResultValue next = scanResultIterator.next();
List<Object> events = (List<Object>) next.getEvents();
for (Object event : events) {
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
// needs to be preserved for queries using the compactedList result format
q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event)));
}
}
Iterator queueIterator = q.iterator();
return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedTimeOrderedQueueIterator>()
{
@Override
public ScanBatchedTimeOrderedQueueIterator make()
{
return new ScanBatchedTimeOrderedQueueIterator(queueIterator, scanQuery.getBatchSize());
}
@Override
public void cleanup(ScanBatchedTimeOrderedQueueIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
});
} else {
throw new UOE("Time ordering [%s] is not supported", scanQuery.getTimeOrder());
}
} }
}; };
} }