review comments

1) Rename firehose to IngestSegment
2) fix segment overlapping, intervals
3) fix overshadow
This commit is contained in:
nishantmonu51 2014-07-04 12:31:33 +05:30
parent 1de390801f
commit 97b58eb193
2 changed files with 23 additions and 26 deletions

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.indexing.firehose.DruidFirehoseFactory;
import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
@ -39,7 +39,7 @@ public class IndexingServiceFirehoseModule implements DruidModule
new SimpleModule("IndexingServiceFirehoseModule")
.registerSubtypes(
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(DruidFirehoseFactory.class, "druid")
new NamedType(IngestSegmentFirehoseFactory.class, "ingestSegment")
)
);
}

View File

@ -61,6 +61,7 @@ import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -72,9 +73,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
public class DruidFirehoseFactory implements FirehoseFactory<InputRowParser>
public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(DruidFirehoseFactory.class);
private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class);
private final String dataSource;
private final Interval interval;
private final DimFilter dimFilter;
@ -83,7 +84,7 @@ public class DruidFirehoseFactory implements FirehoseFactory<InputRowParser>
private final Injector injector;
@JsonCreator
public DruidFirehoseFactory(
public IngestSegmentFirehoseFactory(
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("filter") DimFilter dimFilter,
@ -155,6 +156,9 @@ public class DruidFirehoseFactory implements FirehoseFactory<InputRowParser>
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<String, DataSegment>(
Ordering.<String>natural().nullsFirst()
);
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(
interval
);
for (DataSegment segment : usedSegments) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
@ -163,8 +167,8 @@ public class DruidFirehoseFactory implements FirehoseFactory<InputRowParser>
dims = dimensions;
} else {
Set<String> dimSet = new HashSet<>();
for (DataSegment segment : usedSegments) {
dimSet.addAll(segment.getDimensions());
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) {
dimSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions());
}
dims = Lists.newArrayList(dimSet);
}
@ -174,15 +178,15 @@ public class DruidFirehoseFactory implements FirehoseFactory<InputRowParser>
metricsList = metrics;
} else {
Set<String> metricsSet = new HashSet<>();
for (DataSegment segment : usedSegments) {
metricsSet.addAll(segment.getMetrics());
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) {
metricsSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions());
}
metricsList = Lists.newArrayList(metricsSet);
}
final List<StorageAdapter> adapters = Lists.transform(
timeline.lookup(new Interval("1000-01-01/3000-01-01")),
timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, StorageAdapter>()
{
@Override
@ -234,11 +238,11 @@ public class DruidFirehoseFactory implements FirehoseFactory<InputRowParser>
{
@Nullable
@Override
public Sequence<InputRow> apply(@Nullable StorageAdapter input)
public Sequence<InputRow> apply(@Nullable StorageAdapter adapter)
{
return Sequences.concat(
Sequences.map(
input.makeCursors(
adapter.makeCursors(
Filters.convertDimensionFilters(dimFilter),
interval,
QueryGranularity.ALL
@ -246,24 +250,24 @@ public class DruidFirehoseFactory implements FirehoseFactory<InputRowParser>
{
@Nullable
@Override
public Sequence<InputRow> apply(@Nullable Cursor input)
public Sequence<InputRow> apply(@Nullable Cursor cursor)
{
TimestampColumnSelector timestampColumnSelector = input.makeTimestampColumnSelector();
TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
final DimensionSelector dimSelector = input.makeDimensionSelector(dim);
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
dimSelectors.put(dim, dimSelector);
}
Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = input.makeObjectColumnSelector(metric);
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector);
}
List<InputRow> rowList = Lists.newArrayList();
while (!input.isDone()) {
while (!cursor.isDone()) {
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getTimestamp();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
@ -291,7 +295,7 @@ public class DruidFirehoseFactory implements FirehoseFactory<InputRowParser>
theEvent.put(metric, selector.get());
}
rowList.add(new MapBasedInputRow(timestamp, dims, theEvent));
input.advance();
cursor.advance();
}
return Sequences.simple(rowList);
}
@ -333,14 +337,7 @@ public class DruidFirehoseFactory implements FirehoseFactory<InputRowParser>
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
// Nothing to do.
}
};
return Runnables.getNoopRunnable();
}
@Override