diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 3efc39be9e8..ac1c94dae1c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -85,6 +85,7 @@ import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.firehose.ChatHandler; import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.CombiningFirehoseFactory; import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.server.security.Action; import io.druid.server.security.AuthorizerMapper; @@ -418,10 +419,7 @@ public class IndexTask extends AbstractTask implements ChatHandler final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { - // pass toolbox to Firehose - ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); - } + setFirehoseFactoryToolbox(firehoseFactory, toolbox); final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. @@ -477,6 +475,25 @@ public class IndexTask extends AbstractTask implements ChatHandler } } + // pass toolbox to any IngestSegmentFirehoseFactory + private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox) + { + if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { + ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); + return; + } + + if (firehoseFactory instanceof CombiningFirehoseFactory) { + for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) { + if (delegateFactory instanceof IngestSegmentFirehoseFactory) { + ((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox); + } else if (delegateFactory instanceof CombiningFirehoseFactory) { + setFirehoseFactoryToolbox(delegateFactory, toolbox); + } + } + } + } + private Map getTaskCompletionReports() { return TaskReport.buildTaskReports(