Fix IngestSegmentFirehoseFactory (#4069)

This commit is contained in:
Zhihui Jiao 2017-03-18 06:57:25 +08:00 committed by Roman Leventov
parent 84fe91ba0b
commit 6febcd9f24
2 changed files with 32 additions and 9 deletions

View File

@ -55,6 +55,7 @@ import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.Comparators;
@ -168,6 +169,12 @@ public class IndexTask extends AbstractTask
.isPresent(); .isPresent();
final FirehoseFactory delegateFirehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); final FirehoseFactory delegateFirehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
if (delegateFirehoseFactory instanceof IngestSegmentFirehoseFactory) {
// pass toolbox to Firehose
((IngestSegmentFirehoseFactory) delegateFirehoseFactory).setTaskToolbox(toolbox);
}
final FirehoseFactory firehoseFactory; final FirehoseFactory firehoseFactory;
if (ingestionSchema.getIOConfig().isSkipFirehoseCaching() if (ingestionSchema.getIOConfig().isSkipFirehoseCaching()
|| delegateFirehoseFactory instanceof ReplayableFirehoseFactory) { || delegateFirehoseFactory instanceof ReplayableFirehoseFactory) {
@ -290,7 +297,10 @@ public class IndexTask extends AbstractTask
hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector()));
} }
List<Object> groupKey = Rows.toGroupKey(queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), inputRow); List<Object> groupKey = Rows.toGroupKey(
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
inputRow
);
hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
} }
} }
@ -385,7 +395,12 @@ public class IndexTask extends AbstractTask
try ( try (
final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema); final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema);
final FiniteAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator, fireDepartmentMetrics); final FiniteAppenderatorDriver driver = newDriver(
appenderator,
toolbox,
segmentAllocator,
fireDepartmentMetrics
);
final Firehose firehose = firehoseFactory.connect(dataSchema.getParser()) final Firehose firehose = firehoseFactory.connect(dataSchema.getParser())
) { ) {
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose); final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);

View File

@ -68,6 +68,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
private final List<String> metrics; private final List<String> metrics;
private final Injector injector; private final Injector injector;
private final IndexIO indexIO; private final IndexIO indexIO;
private TaskToolbox taskToolbox;
@JsonCreator @JsonCreator
public IngestSegmentFirehoseFactory( public IngestSegmentFirehoseFactory(
@ -121,21 +122,28 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
return metrics; return metrics;
} }
public void setTaskToolbox(TaskToolbox taskToolbox)
{
this.taskToolbox = taskToolbox;
}
@Override @Override
public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException
{ {
log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval); log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval);
// better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method.
// Noop Task is just used to create the toolbox and list segments. if (taskToolbox == null) {
final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build( // Noop Task is just used to create the toolbox and list segments.
new NoopTask("reingest", 0, 0, null, null, null) taskToolbox = injector.getInstance(TaskToolboxFactory.class).build(
); new NoopTask("reingest", 0, 0, null, null, null)
);
}
try { try {
final List<DataSegment> usedSegments = toolbox final List<DataSegment> usedSegments = taskToolbox
.getTaskActionClient() .getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval, null)); .submit(new SegmentListUsedAction(dataSource, interval, null));
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments); final Map<DataSegment, File> segmentFileMap = taskToolbox.fetchSegments(usedSegments);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>( VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
Ordering.<String>natural().nullsFirst() Ordering.<String>natural().nullsFirst()
); );