add a firehose module to remove so much copy and pasted code

This commit is contained in:
fjy 2013-09-30 16:29:20 -07:00
parent ed9e0cf9f6
commit f55a5199b1
8 changed files with 90 additions and 186 deletions

View File

@ -20,6 +20,7 @@
package io.druid.indexing.common.index;
import com.google.common.base.Throwables;
import io.druid.common.guava.Runnables;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.data.input.StringInputRowParser;
@ -77,8 +78,7 @@ public abstract class FileIteratingFirehose<T> implements Firehose
@Override
public Runnable commit()
{
// Do nothing.
return new Runnable() { public void run() {} };
return Runnables.getNoopRunnable();
}
@Override

View File

@ -19,16 +19,11 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.airlift.command.Command;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.Jerseys;
@ -37,9 +32,6 @@ import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.LocalFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.indexing.coordinator.ForkingTaskRunner;
import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.worker.Worker;
@ -47,17 +39,10 @@ import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.indexing.worker.WorkerTaskMonitor;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.indexing.worker.http.WorkerResource;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.DruidNode;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
import java.util.Arrays;
import java.util.List;
/**
@ -79,7 +64,7 @@ public class CliMiddleManager extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new DruidModule()
new Module()
{
@Override
public void configure(Binder binder)
@ -112,28 +97,6 @@ public class CliMiddleManager extends ServerRunnable
config.getVersion()
);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(LocalFirehoseFactory.class, "local")
)
);
}
}
);
}

View File

@ -19,8 +19,6 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
@ -30,10 +28,6 @@ import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.airlift.command.Command;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.JacksonConfigProvider;
@ -47,9 +41,6 @@ import io.druid.guice.PolyBind;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.LocalFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogs;
@ -78,12 +69,6 @@ import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.coordinator.setup.WorkerSetupData;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.JettyServerInitializer;
@ -99,7 +84,6 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import org.eclipse.jetty.util.resource.ResourceCollection;
import java.util.Arrays;
import java.util.List;
/**
@ -121,7 +105,7 @@ public class CliOverlord extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new DruidModule()
new Module()
{
@Override
public void configure(Binder binder)
@ -215,28 +199,6 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(LocalFirehoseFactory.class, "local")
)
);
}
}
);
}

View File

@ -105,7 +105,7 @@ public class CliPeon extends GuiceRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new DruidModule()
new Module()
{
@Override
public void configure(Binder binder)
@ -158,28 +158,6 @@ public class CliPeon extends GuiceRunnable
LifecycleModule.register(binder, Server.class);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(LocalFirehoseFactory.class, "local")
)
);
}
}
);
}

View File

@ -19,39 +19,23 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.airlift.command.Command;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.ServerView;
import io.druid.guice.NoopSegmentPublisherProvider;
import io.druid.guice.RealtimeModule;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.LocalFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.initialization.DruidModule;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
@ -75,7 +59,7 @@ public class CliRealtimeExample extends ServerRunnable
{
return ImmutableList.<Object>of(
new RealtimeModule(),
new DruidModule()
new Module()
{
@Override
public void configure(Binder binder)
@ -86,28 +70,6 @@ public class CliRealtimeExample extends ServerRunnable
binder.bind(InventoryView.class).to(NoopInventoryView.class);
binder.bind(ServerView.class).to(NoopServerView.class);
}
@Override
public List<Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("RealtimeExampleModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(LocalFirehoseFactory.class, "local")
)
);
}
}
);
}

View File

@ -40,6 +40,7 @@ import io.druid.guice.DataSegmentPusherPullerModule;
import io.druid.guice.DbConnectorModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.FirehoseModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.IndexingServiceDiscoveryModule;
import io.druid.guice.JacksonConfigManagerModule;
@ -225,7 +226,8 @@ public class
new JacksonConfigManagerModule(),
new IndexingServiceDiscoveryModule(),
new DataSegmentPusherPullerModule(),
new TaskLogsModule()
new TaskLogsModule(),
new FirehoseModule()
);
ModuleList actualModules = new ModuleList(baseInjector);

View File

@ -0,0 +1,73 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.LocalFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import java.util.Arrays;
import java.util.List;
/**
*/
public class FirehoseModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
}
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("FirehoseModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(LocalFirehoseFactory.class, "local")
)
);
}
}

View File

@ -19,43 +19,26 @@
package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.druid.cli.QueryJettyServerInitializer;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.FileIteratingFirehose;
import io.druid.indexing.common.index.LocalFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.initialization.DruidModule;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.NoopSegmentPublisher;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
import java.util.Arrays;
import java.util.List;
/**
*/
public class RealtimeModule implements DruidModule
public class RealtimeModule implements Module
{
@Override
public void configure(Binder binder)
@ -66,7 +49,10 @@ public class RealtimeModule implements DruidModule
Key.get(SegmentPublisher.class),
Key.get(NoopSegmentPublisher.class)
);
final MapBinder<String, SegmentPublisher> publisherBinder = PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class));
final MapBinder<String, SegmentPublisher> publisherBinder = PolyBind.optionBinder(
binder,
Key.get(SegmentPublisher.class)
);
publisherBinder.addBinding("db").to(DbSegmentPublisher.class);
binder.bind(DbSegmentPublisher.class).in(LazySingleton.class);
@ -85,26 +71,4 @@ public class RealtimeModule implements DruidModule
LifecycleModule.register(binder, Server.class);
}
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(LocalFirehoseFactory.class, "local")
)
);
}
}