diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index d4be7db19d5..c1fc01fd8a5 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -28,6 +28,7 @@ import com.google.common.io.Closeables; import com.metamx.common.exception.FormattedException; import com.metamx.druid.Query; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.index.v1.IndexGranularity; import com.metamx.druid.input.InputRow; import com.metamx.druid.merger.common.TaskLock; @@ -37,18 +38,20 @@ import com.metamx.druid.merger.common.actions.LockAcquireAction; import com.metamx.druid.merger.common.actions.LockListAction; import com.metamx.druid.merger.common.actions.LockReleaseAction; import com.metamx.druid.merger.common.actions.SegmentInsertAction; +import com.metamx.druid.query.FinalizeResultsQueryRunner; import com.metamx.druid.query.QueryRunner; +import com.metamx.druid.query.QueryRunnerFactory; +import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.realtime.FireDepartmentConfig; import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.GracefulShutdownFirehose; import com.metamx.druid.realtime.MinTimeFirehose; +import com.metamx.druid.realtime.Schema; +import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; -import com.metamx.druid.realtime.Schema; -import com.metamx.druid.coordination.DataSegmentAnnouncer; -import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.plumber.Sink; import com.metamx.druid.realtime.plumber.VersioningPolicy; import com.metamx.emitter.EmittingLogger; @@ -84,6 +87,9 @@ public class RealtimeIndexTask extends AbstractTask @JsonIgnore private volatile Plumber plumber = null; + @JsonIgnore + private volatile TaskToolbox toolbox = null; + @JsonIgnore private volatile GracefulShutdownFirehose firehose = null; @@ -141,7 +147,10 @@ public class RealtimeIndexTask extends AbstractTask public QueryRunner getQueryRunner(Query query) { if (plumber != null) { - return plumber.getQueryRunner(query); + QueryRunnerFactory> factory = toolbox.getQueryRunnerFactoryConglomerate().findFactory(query); + QueryToolChest> toolChest = factory.getToolchest(); + + return new FinalizeResultsQueryRunner(plumber.getQueryRunner(query), toolChest); } else { return null; } @@ -257,6 +266,7 @@ public class RealtimeIndexTask extends AbstractTask realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView()); realtimePlumberSchool.setServiceEmitter(toolbox.getEmitter()); + this.toolbox = toolbox; this.plumber = realtimePlumberSchool.findPlumber(schema, metrics); try { diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java index 56b18fdd691..0c93ce923a3 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java @@ -225,7 +225,7 @@ public class RealtimeManager implements QuerySegmentWalker QueryRunnerFactory> factory = conglomerate.findFactory(query); QueryToolChest> toolChest = factory.getToolchest(); - return new FinalizeResultsQueryRunner(toolChest.mergeResults(plumber.getQueryRunner(query)), toolChest); + return new FinalizeResultsQueryRunner(plumber.getQueryRunner(query), toolChest); } public void close() throws IOException diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index a7dec6c642a..21a28062537 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -230,10 +230,11 @@ public class RealtimePlumberSchool implements PlumberSchool public QueryRunner getQueryRunner(final Query query) { final QueryRunnerFactory> factory = conglomerate.findFactory(query); + final QueryToolChest> toolchest = factory.getToolchest(); + final Function, ServiceMetricEvent.Builder> builderFn = new Function, ServiceMetricEvent.Builder>() { - private final QueryToolChest> toolchest = factory.getToolchest(); @Override public ServiceMetricEvent.Builder apply(@Nullable Query input) @@ -242,37 +243,39 @@ public class RealtimePlumberSchool implements PlumberSchool } }; - return factory.mergeRunners( - EXEC, - FunctionalIterable - .create(sinks.values()) - .transform( - new Function>() - { - @Override - public QueryRunner apply(@Nullable Sink input) - { - return new MetricsEmittingQueryRunner( - emitter, - builderFn, - factory.mergeRunners( - EXEC, - Iterables.transform( - input, - new Function>() - { - @Override - public QueryRunner apply(@Nullable FireHydrant input) - { - return factory.createRunner(input.getSegment()); - } - } + return toolchest.mergeResults( + factory.mergeRunners( + EXEC, + FunctionalIterable + .create(sinks.values()) + .transform( + new Function>() + { + @Override + public QueryRunner apply(@Nullable Sink input) + { + return new MetricsEmittingQueryRunner( + emitter, + builderFn, + factory.mergeRunners( + EXEC, + Iterables.transform( + input, + new Function>() + { + @Override + public QueryRunner apply(@Nullable FireHydrant input) + { + return factory.createRunner(input.getSegment()); + } + } + ) ) - ) - ); - } - } - ) + ); + } + } + ) + ) ); }