Fix CombiningFirehoseFactory with IngestSegmentFirehoseFactory in IndexTask (#6065)

* Fix CombiningFirehoseFactory with IngestSegmentFirehoseFactory in IndexTask

* Make recursive
This commit is contained in:
Jonathan Wei 2018-07-31 15:30:44 -07:00 committed by GitHub
parent ea72907365
commit 91943a24db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 4 deletions

View File

@ -85,6 +85,7 @@ import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
import io.druid.server.security.Action;
import io.druid.server.security.AuthorizerMapper;
@ -418,10 +419,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
// pass toolbox to Firehose
((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
}
setFirehoseFactoryToolbox(firehoseFactory, toolbox);
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
// Firehose temporary directory is automatically removed when this IndexTask completes.
@ -477,6 +475,25 @@ public class IndexTask extends AbstractTask implements ChatHandler
}
}
// pass toolbox to any IngestSegmentFirehoseFactory
private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox)
{
if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
return;
}
if (firehoseFactory instanceof CombiningFirehoseFactory) {
for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) {
if (delegateFactory instanceof IngestSegmentFirehoseFactory) {
((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox);
} else if (delegateFactory instanceof CombiningFirehoseFactory) {
setFirehoseFactoryToolbox(delegateFactory, toolbox);
}
}
}
}
private Map<String, TaskReport> getTaskCompletionReports()
{
return TaskReport.buildTaskReports(