mirror of https://github.com/apache/druid.git
Ability to skip Incremental Index during query using query context
This PR adds adds the ability to skip incremental index when querying results from realtime nodes. default behaviour is to include incrementalIndex in queries. review comment
This commit is contained in:
parent
0fb7e4e040
commit
60f649dab1
|
@ -49,6 +49,7 @@ import io.druid.concurrent.Execs;
|
||||||
import io.druid.data.input.Committer;
|
import io.druid.data.input.Committer;
|
||||||
import io.druid.data.input.InputRow;
|
import io.druid.data.input.InputRow;
|
||||||
import io.druid.query.MetricsEmittingQueryRunner;
|
import io.druid.query.MetricsEmittingQueryRunner;
|
||||||
|
import io.druid.query.NoopQueryRunner;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
import io.druid.query.QueryRunnerFactory;
|
import io.druid.query.QueryRunnerFactory;
|
||||||
|
@ -132,6 +133,8 @@ public class RealtimePlumber implements Plumber
|
||||||
|
|
||||||
private static final String COMMIT_METADATA_KEY = "%commitMetadata%";
|
private static final String COMMIT_METADATA_KEY = "%commitMetadata%";
|
||||||
private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%";
|
private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%";
|
||||||
|
private static final String SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
|
||||||
|
|
||||||
|
|
||||||
public RealtimePlumber(
|
public RealtimePlumber(
|
||||||
DataSchema schema,
|
DataSchema schema,
|
||||||
|
@ -255,6 +258,7 @@ public class RealtimePlumber implements Plumber
|
||||||
@Override
|
@Override
|
||||||
public <T> QueryRunner<T> getQueryRunner(final Query<T> query)
|
public <T> QueryRunner<T> getQueryRunner(final Query<T> query)
|
||||||
{
|
{
|
||||||
|
final boolean skipIncrementalSegment = query.getContextValue(SKIP_INCREMENTAL_SEGMENT, false);
|
||||||
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
|
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
|
||||||
final QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
|
final QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
|
||||||
|
|
||||||
|
@ -322,6 +326,10 @@ public class RealtimePlumber implements Plumber
|
||||||
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor);
|
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (skipIncrementalSegment && !input.hasSwapped()) {
|
||||||
|
return new NoopQueryRunner<T>();
|
||||||
|
}
|
||||||
|
|
||||||
// Prevent the underlying segment from closing when its being iterated
|
// Prevent the underlying segment from closing when its being iterated
|
||||||
final Closeable closeable = input.getSegment().increment();
|
final Closeable closeable = input.getSegment().increment();
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue