Set up time ordering strategy decision tree

This commit is contained in:
Justin Borromeo 2019-02-21 15:13:33 -08:00
parent fba6b022f0
commit b13ff624a9
1 changed files with 36 additions and 29 deletions

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.Query;
@ -45,15 +46,18 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
public static final String CTX_COUNT = "count";
private final ScanQueryQueryToolChest toolChest;
private final ScanQueryEngine engine;
private final ScanQueryConfig scanQueryConfig;
@Inject
public ScanQueryRunnerFactory(
ScanQueryQueryToolChest toolChest,
ScanQueryEngine engine
ScanQueryEngine engine,
ScanQueryConfig scanQueryConfig
)
{
this.toolChest = toolChest;
this.engine = engine;
this.scanQueryConfig = scanQueryConfig;
}
@Override
@ -69,13 +73,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
)
{
// in single thread and in jetty thread instead of processing thread
return new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(
final QueryPlus<ScanResultValue> queryPlus, final Map<String, Object> responseContext
)
{
return (queryPlus, responseContext) -> {
ScanQuery query = (ScanQuery) queryPlus.getQuery();
int numSegments = 0;
final Iterator<QueryRunner<ScanResultValue>> segmentIt = queryRunners.iterator();
for (; segmentIt.hasNext(); numSegments++) {
@ -85,20 +84,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
if (query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) {
// Use existing strategy
} else if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
// Use priority queue strategy
} else if (numSegments <= scanQueryConfig.getMaxSegmentsTimeOrderedInMemory()) {
// Use flatMerge strategy
} else {
throw new UOE(
"Time ordering for result set limit of %,d is not supported. Try lowering the "
+ "result set size to less than or equal to the time ordering limit of %,d.",
query.getLimit(),
scanQueryConfig.getMaxRowsQueuedForTimeOrdering()
);
}
return Sequences.concat(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<ScanResultValue>, Sequence<ScanResultValue>>()
{
@Override
public Sequence<ScanResultValue> apply(final QueryRunner<ScanResultValue> input)
{
return input.run(queryPlus, responseContext);
}
}
input -> input.run(queryPlus, responseContext)
)
);
}
};
}