diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 40b1a47a7a3..b4dcf11620a 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -53,6 +53,7 @@ import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.query.MetricsEmittingQueryRunner; +import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -141,6 +142,8 @@ public class RealtimePlumber implements Plumber private static final String COMMIT_METADATA_KEY = "%commitMetadata%"; private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%"; + private static final String SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment"; + public RealtimePlumber( DataSchema schema, @@ -274,6 +277,7 @@ public class RealtimePlumber implements Plumber @Override public QueryRunner getQueryRunner(final Query query) { + final boolean skipIncrementalSegment = query.getContextValue(SKIP_INCREMENTAL_SEGMENT, false); final QueryRunnerFactory> factory = conglomerate.findFactory(query); final QueryToolChest> toolchest = factory.getToolchest(); @@ -341,6 +345,10 @@ public class RealtimePlumber implements Plumber return new ReportTimelineMissingSegmentQueryRunner(descriptor); } + if (skipIncrementalSegment && !input.hasSwapped()) { + return new NoopQueryRunner(); + } + // Prevent the underlying segment from closing when its being iterated final ReferenceCountingSegment segment = input.getSegment(); final Closeable closeable = segment.increment();