From 1de390801fd1207388984b5a5684925138f01dc3 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 2 Jul 2014 20:34:18 +0530 Subject: [PATCH] Druid Fireshose Add druidFirehose, can be used to reIndex filtered data from a datasource --- .../guice/IndexingServiceFirehoseModule.java | 6 +- .../firehose/DruidFirehoseFactory.java | 352 ++++++++++++++++++ 2 files changed, 356 insertions(+), 2 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/firehose/DruidFirehoseFactory.java diff --git a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java index b8609453f3d..55f94147b67 100644 --- a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java +++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java @@ -24,8 +24,9 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; +import io.druid.indexing.firehose.DruidFirehoseFactory; import io.druid.initialization.DruidModule; +import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import java.util.List; @@ -37,7 +38,8 @@ public class IndexingServiceFirehoseModule implements DruidModule return ImmutableList.of( new SimpleModule("IndexingServiceFirehoseModule") .registerSubtypes( - new NamedType(EventReceiverFirehoseFactory.class, "receiver") + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(DruidFirehoseFactory.class, "druid") ) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/DruidFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/DruidFirehoseFactory.java new file mode 100644 index 00000000000..215ddd937e4 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/DruidFirehoseFactory.java @@ -0,0 +1,352 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.firehose; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.inject.Injector; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import com.metamx.common.parsers.ParseException; +import com.metamx.emitter.EmittingLogger; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.actions.SegmentListUsedAction; +import io.druid.indexing.common.task.NoopTask; +import io.druid.query.filter.DimFilter; +import io.druid.query.select.EventHolder; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionSelector; +import io.druid.segment.IndexIO; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.QueryableIndexStorageAdapter; +import io.druid.segment.StorageAdapter; +import io.druid.segment.TimestampColumnSelector; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.filter.Filters; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DruidFirehoseFactory implements FirehoseFactory +{ + private static final EmittingLogger log = new EmittingLogger(DruidFirehoseFactory.class); + private final String dataSource; + private final Interval interval; + private final DimFilter dimFilter; + private final List dimensions; + private final List metrics; + private final Injector injector; + + @JsonCreator + public DruidFirehoseFactory( + @JsonProperty("dataSource") final String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("filter") DimFilter dimFilter, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("metrics") List metrics, + @JacksonInject Injector injector + ) + { + Preconditions.checkNotNull(dataSource, "dataSource"); + Preconditions.checkNotNull(interval, "interval"); + this.dataSource = dataSource; + this.interval = interval; + this.dimFilter = dimFilter; + this.dimensions = dimensions; + this.metrics = metrics; + this.injector = injector; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty("filter") + public DimFilter getDimensionsFilter() + { + return dimFilter; + } + + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public List getMetrics() + { + return metrics; + } + + @Override + public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException + { + log.info("Connecting firehose: DruidFirehose[%s,%s]", dataSource, interval); + // TODO: have a way to pass the toolbox to Firehose, The instance is initialized Lazily on connect method. + final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build( + new NoopTask( + "druid-firehose", + 0, + 0, + null, + null + ) + ); + + try { + final List usedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUsedAction(dataSource, interval)); + final Map segmentFileMap = toolbox.fetchSegments(usedSegments); + VersionedIntervalTimeline timeline = new VersionedIntervalTimeline( + Ordering.natural().nullsFirst() + ); + for (DataSegment segment : usedSegments) { + timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); + } + List dims; + if (dimensions != null) { + dims = dimensions; + } else { + Set dimSet = new HashSet<>(); + for (DataSegment segment : usedSegments) { + dimSet.addAll(segment.getDimensions()); + } + dims = Lists.newArrayList(dimSet); + } + + List metricsList; + if (metrics != null) { + metricsList = metrics; + } else { + Set metricsSet = new HashSet<>(); + for (DataSegment segment : usedSegments) { + metricsSet.addAll(segment.getMetrics()); + } + metricsList = Lists.newArrayList(metricsSet); + } + + + final List adapters = Lists.transform( + timeline.lookup(new Interval("1000-01-01/3000-01-01")), + new Function, StorageAdapter>() + { + @Override + public StorageAdapter apply(TimelineObjectHolder input) + { + final DataSegment segment = input.getObject().getChunk(0).getObject(); + final File file = Preconditions.checkNotNull( + segmentFileMap.get(segment), + "File for segment %s", segment.getIdentifier() + ); + + try { + return new QueryableIndexStorageAdapter((IndexIO.loadIndex(file))); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + } + ); + + return new DruidFirehose(adapters, dims, metricsList); + + } + catch (IOException e) { + throw Throwables.propagate(e); + } + catch (SegmentLoadingException e) { + throw Throwables.propagate(e); + } + + } + + @Override + public InputRowParser getParser() + { + return null; + } + + public class DruidFirehose implements Firehose + { + private volatile Yielder rowYielder; + + public DruidFirehose(List adapters, final List dims, final List metrics) + { + Sequence rows = Sequences.concat( + Iterables.transform( + adapters, new Function>() + { + @Nullable + @Override + public Sequence apply(@Nullable StorageAdapter input) + { + return Sequences.concat( + Sequences.map( + input.makeCursors( + Filters.convertDimensionFilters(dimFilter), + interval, + QueryGranularity.ALL + ), new Function>() + { + @Nullable + @Override + public Sequence apply(@Nullable Cursor input) + { + TimestampColumnSelector timestampColumnSelector = input.makeTimestampColumnSelector(); + + Map dimSelectors = Maps.newHashMap(); + for (String dim : dims) { + final DimensionSelector dimSelector = input.makeDimensionSelector(dim); + dimSelectors.put(dim, dimSelector); + } + + Map metSelectors = Maps.newHashMap(); + for (String metric : metrics) { + final ObjectColumnSelector metricSelector = input.makeObjectColumnSelector(metric); + metSelectors.put(metric, metricSelector); + } + + List rowList = Lists.newArrayList(); + while (!input.isDone()) { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.getTimestamp(); + theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); + + for (Map.Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + if (vals.size() == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else { + List dimVals = Lists.newArrayList(); + for (int i = 0; i < vals.size(); ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Map.Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final ObjectColumnSelector selector = metSelector.getValue(); + theEvent.put(metric, selector.get()); + } + rowList.add(new MapBasedInputRow(timestamp, dims, theEvent)); + input.advance(); + } + return Sequences.simple(rowList); + } + } + ) + ); + } + } + ) + ); + rowYielder = rows.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } + ); + } + + @Override + public boolean hasMore() + { + return !rowYielder.isDone(); + } + + @Override + public InputRow nextRow() + { + final InputRow inputRow = rowYielder.get(); + rowYielder = rowYielder.next(null); + return inputRow; + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + // Nothing to do. + } + }; + } + + @Override + public void close() throws IOException + { + rowYielder.close(); + } + } +}