Merge pull request #627 from metamx/druid-firehose

Functionality to ingest a Druid segment and change the schema
This commit is contained in:
fjy 2014-07-17 13:41:16 -06:00
commit bc650a1c80
8 changed files with 710 additions and 5 deletions

View File

@ -24,8 +24,9 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.initialization.DruidModule; import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import java.util.List; import java.util.List;
@ -37,7 +38,8 @@ public class IndexingServiceFirehoseModule implements DruidModule
return ImmutableList.<Module>of( return ImmutableList.<Module>of(
new SimpleModule("IndexingServiceFirehoseModule") new SimpleModule("IndexingServiceFirehoseModule")
.registerSubtypes( .registerSubtypes(
new NamedType(EventReceiverFirehoseFactory.class, "receiver") new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(IngestSegmentFirehoseFactory.class, "ingestSegment")
) )
); );
} }

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.firehose.IngestTask;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
@ -53,7 +54,8 @@ import io.druid.query.QueryRunner;
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class), @JsonSubTypes.Type(name = "noop", value = NoopTask.class),
@JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class), @JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class),
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class) @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class),
@JsonSubTypes.Type(name = "ingest-task", value = IngestTask.class)
}) })
public interface Task public interface Task
{ {

View File

@ -0,0 +1,372 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.firehose;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Injector;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.query.filter.DimFilter;
import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class);
private final String dataSource;
private final Interval interval;
private final DimFilter dimFilter;
private final List<String> dimensions;
private final List<String> metrics;
private final Injector injector;
@JsonCreator
public IngestSegmentFirehoseFactory(
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JacksonInject Injector injector
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(interval, "interval");
this.dataSource = dataSource;
this.interval = interval;
this.dimFilter = dimFilter;
this.dimensions = dimensions;
this.metrics = metrics;
this.injector = injector;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
return dimFilter;
}
@JsonProperty
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty
public List<String> getMetrics()
{
return metrics;
}
@Override
public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException
{
log.info("Connecting firehose: IngestSegmentFirehose[%s,%s]", dataSource, interval);
// better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method.
final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build(
new IngestTask("Ingest-Task-Id", dataSource)
);
try {
final List<DataSegment> usedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval));
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<String, DataSegment>(
Ordering.<String>natural().nullsFirst()
);
for (DataSegment segment : usedSegments) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(
interval
);
List<String> dims;
if (dimensions != null) {
dims = dimensions;
} else {
Set<String> dimSet = new HashSet<>();
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) {
dimSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions());
}
dims = Lists.newArrayList(dimSet);
}
List<String> metricsList;
if (metrics != null) {
metricsList = metrics;
} else {
Set<String> metricsSet = new HashSet<>();
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) {
metricsSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions());
}
metricsList = Lists.newArrayList(metricsSet);
}
final List<StorageAdapter> adapters = Lists.transform(
timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, StorageAdapter>()
{
@Override
public StorageAdapter apply(TimelineObjectHolder<String, DataSegment> input)
{
final DataSegment segment = input.getObject().getChunk(0).getObject();
final File file = Preconditions.checkNotNull(
segmentFileMap.get(segment),
"File for segment %s", segment.getIdentifier()
);
try {
return new QueryableIndexStorageAdapter((IndexIO.loadIndex(file)));
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
);
return new IngestSegmentFirehose(adapters, dims, metricsList);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
catch (SegmentLoadingException e) {
throw Throwables.propagate(e);
}
}
@Override
public InputRowParser getParser()
{
return null;
}
public class IngestSegmentFirehose implements Firehose
{
private volatile Yielder<InputRow> rowYielder;
public IngestSegmentFirehose(List<StorageAdapter> adapters, final List<String> dims, final List<String> metrics)
{
Sequence<InputRow> rows = Sequences.concat(
Iterables.transform(
adapters, new Function<StorageAdapter, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(@Nullable StorageAdapter adapter)
{
return Sequences.concat(
Sequences.map(
adapter.makeCursors(
Filters.convertDimensionFilters(dimFilter),
interval,
QueryGranularity.ALL
), new Function<Cursor, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(@Nullable final Cursor cursor)
{
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
dimSelectors.put(dim, dimSelector);
}
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector);
}
return Sequences.simple(
new Iterable<InputRow>()
{
@Override
public Iterator<InputRow> iterator()
{
return new Iterator<InputRow>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}
@Override
public InputRow next()
{
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getTimestamp();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();
if (vals.size() == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {
List<String> dimVals = Lists.newArrayList();
for (int i = 0; i < vals.size(); ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
theEvent.put(metric, selector.get());
}
cursor.advance();
return new MapBasedInputRow(timestamp, dims, theEvent);
}
@Override
public void remove()
{
throw new UnsupportedOperationException("Remove Not Supported");
}
};
}
}
);
}
}
)
);
}
}
)
);
rowYielder = rows.toYielder(
null,
new YieldingAccumulator()
{
@Override
public Object accumulate(Object accumulated, Object in)
{
yield();
return in;
}
}
);
}
@Override
public boolean hasMore()
{
return !rowYielder.isDone();
}
@Override
public InputRow nextRow()
{
final InputRow inputRow = rowYielder.get();
rowYielder = rowYielder.next(null);
return inputRow;
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
rowYielder.close();
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
public class IngestTask extends AbstractTask
{
public IngestTask(
@JsonProperty("id") final String id,
@JsonProperty("dataSource") final String dataSource
)
{
super(id, dataSource);
}
@Override
public String getType()
{
return "Ingest-Task";
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
return TaskStatus.success(getId());
}
}

View File

@ -963,7 +963,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (multiValueRow.size() == 0) { if (multiValueRow.size() == 0) {
return null; return null;
} else if (multiValueRow.size() == 1) { } else if (multiValueRow.size() == 1) {
return columnVals.lookupName(multiValueRow.get(1)); return columnVals.lookupName(multiValueRow.get(0));
} else { } else {
final String[] strings = new String[multiValueRow.size()]; final String[] strings = new String[multiValueRow.size()];
for (int i = 0 ; i < multiValueRow.size() ; i++) { for (int i = 0 ; i < multiValueRow.size() ; i++) {

View File

@ -26,6 +26,7 @@ import com.google.inject.Binder;
import io.druid.data.input.ProtoBufInputRowParser; import io.druid.data.input.ProtoBufInputRowParser;
import io.druid.initialization.DruidModule; import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
@ -53,7 +54,8 @@ public class FirehoseModule implements DruidModule
new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(LocalFirehoseFactory.class, "local"), new NamedType(LocalFirehoseFactory.class, "local"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver") new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(CombiningFirehoseFactory.class, "combining")
) )
); );
} }

View File

@ -0,0 +1,125 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* Creates firehose that combines data from different Firehoses. Useful for ingesting data from multiple sources.
*/
public class CombiningFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private final List<FirehoseFactory> delegateFactoryList;
@JsonCreator
public CombiningFirehoseFactory(
@JsonProperty("delegates") List<FirehoseFactory> delegateFactoryList
)
{
Preconditions.checkArgument(!delegateFactoryList.isEmpty());
this.delegateFactoryList = delegateFactoryList;
}
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
return new CombiningFirehose(parser);
}
@Override
public InputRowParser getParser()
{
return delegateFactoryList.get(0).getParser();
}
@JsonProperty("delegates")
public List<FirehoseFactory> getDelegateFactoryList()
{
return delegateFactoryList;
}
public class CombiningFirehose implements Firehose
{
private final InputRowParser parser;
private final Iterator<FirehoseFactory> firehoseFactoryIterator;
private volatile Firehose currentFirehose;
public CombiningFirehose(InputRowParser parser) throws IOException
{
this.firehoseFactoryIterator = delegateFactoryList.iterator();
this.parser = parser;
nextFirehose();
}
private void nextFirehose()
{
if (firehoseFactoryIterator.hasNext()) {
try {
if (currentFirehose != null) {
currentFirehose.close();
}
currentFirehose = firehoseFactoryIterator.next().connect(parser);
}
catch (IOException e) {
Throwables.propagate(e);
}
}
}
@Override
public boolean hasMore()
{
return currentFirehose.hasMore();
}
@Override
public InputRow nextRow()
{
InputRow rv = currentFirehose.nextRow();
if (!currentFirehose.hasMore()) {
nextFirehose();
}
return rv;
}
@Override
public Runnable commit()
{
return currentFirehose.commit();
}
@Override
public void close() throws IOException
{
currentFirehose.close();
}
}
}

View File

@ -0,0 +1,147 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.realtime.firehose;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import io.druid.utils.Runnables;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class CombiningFirehoseFactoryTest
{
@Test
public void testCombiningfirehose() throws IOException
{
List<InputRow> list1 = Arrays.asList(makeRow(1, 1), makeRow(2, 2));
List<InputRow> list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4));
FirehoseFactory combiningFactory = new CombiningFirehoseFactory(
Arrays.<FirehoseFactory>asList(
new ListFirehoseFactory(
list1
), new ListFirehoseFactory(list2)
)
);
final Firehose firehose = combiningFactory.connect(null);
for (int i = 1; i < 5; i++) {
Assert.assertTrue(firehose.hasMore());
final InputRow inputRow = firehose.nextRow();
Assert.assertEquals(i, inputRow.getTimestampFromEpoch());
Assert.assertEquals(i, inputRow.getFloatMetric("test"), 0);
}
Assert.assertFalse(firehose.hasMore());
}
private InputRow makeRow(final long timestamp, final float metricValue)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return Arrays.asList("testDim");
}
@Override
public long getTimestampFromEpoch()
{
return timestamp;
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return metricValue;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
};
}
public static class ListFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private final List<InputRow> rows;
ListFirehoseFactory(List<InputRow> rows)
{
this.rows = rows;
}
@Override
public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException
{
final Iterator<InputRow> iterator = rows.iterator();
return new Firehose()
{
@Override
public boolean hasMore()
{
return iterator.hasNext();
}
@Override
public InputRow nextRow()
{
return iterator.next();
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
//
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
}
}