CombiningFirehoseFactory

can be used to ingest data from multiple sources.
This commit is contained in:
nishantmonu51 2014-07-08 21:14:48 +05:30
parent 97b58eb193
commit a1a5e4254f
3 changed files with 275 additions and 1 deletions

View File

@ -26,6 +26,7 @@ import com.google.inject.Binder;
import io.druid.data.input.ProtoBufInputRowParser;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
@ -53,7 +54,8 @@ public class FirehoseModule implements DruidModule
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(LocalFirehoseFactory.class, "local"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(CombiningFirehoseFactory.class, "combining")
)
);
}

View File

@ -0,0 +1,125 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* Creates firehose that combines data from different Firehoses. Useful for ingesting data from multiple sources.
*/
public class CombiningFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private final List<FirehoseFactory> delegateFactoryList;
@JsonCreator
public CombiningFirehoseFactory(
@JsonProperty("delegates") List<FirehoseFactory> delegateFactoryList
)
{
Preconditions.checkArgument(!delegateFactoryList.isEmpty());
this.delegateFactoryList = delegateFactoryList;
}
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
return new CombiningFirehose(parser);
}
@Override
public InputRowParser getParser()
{
return delegateFactoryList.get(0).getParser();
}
@JsonProperty("delegates")
public List<FirehoseFactory> getDelegateFactoryList()
{
return delegateFactoryList;
}
public class CombiningFirehose implements Firehose
{
private final InputRowParser parser;
private final Iterator<FirehoseFactory> firehoseFactoryIterator;
private volatile Firehose currentFirehose;
public CombiningFirehose(InputRowParser parser) throws IOException
{
this.firehoseFactoryIterator = delegateFactoryList.iterator();
this.parser = parser;
nextFirehose();
}
private void nextFirehose()
{
if (firehoseFactoryIterator.hasNext()) {
try {
if (currentFirehose != null) {
currentFirehose.close();
}
currentFirehose = firehoseFactoryIterator.next().connect(parser);
}
catch (IOException e) {
Throwables.propagate(e);
}
}
}
@Override
public boolean hasMore()
{
return currentFirehose.hasMore();
}
@Override
public InputRow nextRow()
{
InputRow rv = currentFirehose.nextRow();
if (!currentFirehose.hasMore()) {
nextFirehose();
}
return rv;
}
@Override
public Runnable commit()
{
return currentFirehose.commit();
}
@Override
public void close() throws IOException
{
currentFirehose.close();
}
}
}

View File

@ -0,0 +1,147 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.realtime.firehose;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import io.druid.utils.Runnables;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class CombiningFirehoseFactoryTest
{
@Test
public void testCombiningfirehose() throws IOException
{
List<InputRow> list1 = Arrays.asList(makeRow(1, 1), makeRow(2, 2));
List<InputRow> list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4));
FirehoseFactory combiningFactory = new CombiningFirehoseFactory(
Arrays.<FirehoseFactory>asList(
new ListFirehoseFactory(
list1
), new ListFirehoseFactory(list2)
)
);
final Firehose firehose = combiningFactory.connect(null);
for (int i = 1; i < 5; i++) {
Assert.assertTrue(firehose.hasMore());
final InputRow inputRow = firehose.nextRow();
Assert.assertEquals(i, inputRow.getTimestampFromEpoch());
Assert.assertEquals(i, inputRow.getFloatMetric("test"), 0);
}
Assert.assertFalse(firehose.hasMore());
}
private InputRow makeRow(final long timestamp, final float metricValue)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return Arrays.asList("testDim");
}
@Override
public long getTimestampFromEpoch()
{
return timestamp;
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return metricValue;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
};
}
public static class ListFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private final List<InputRow> rows;
ListFirehoseFactory(List<InputRow> rows)
{
this.rows = rows;
}
@Override
public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException
{
final Iterator<InputRow> iterator = rows.iterator();
return new Firehose()
{
@Override
public boolean hasMore()
{
return iterator.hasNext();
}
@Override
public InputRow nextRow()
{
return iterator.next();
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
//
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
}
}