From f55a5199b14ed697726a168ed98ad3c2fd47584a Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 30 Sep 2013 16:29:20 -0700 Subject: [PATCH] add a firehose module to remove so much copy and pasted code --- .../common/index/FileIteratingFirehose.java | 4 +- .../java/io/druid/cli/CliMiddleManager.java | 41 +---------- .../main/java/io/druid/cli/CliOverlord.java | 40 +--------- .../src/main/java/io/druid/cli/CliPeon.java | 24 +----- .../java/io/druid/cli/CliRealtimeExample.java | 42 +---------- .../java/io/druid/cli/Initialization.java | 4 +- .../java/io/druid/guice/FirehoseModule.java | 73 +++++++++++++++++++ .../java/io/druid/guice/RealtimeModule.java | 48 ++---------- 8 files changed, 90 insertions(+), 186 deletions(-) create mode 100644 services/src/main/java/io/druid/guice/FirehoseModule.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java b/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java index 222009fd797..b6f1070d4bf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java @@ -20,6 +20,7 @@ package io.druid.indexing.common.index; import com.google.common.base.Throwables; +import io.druid.common.guava.Runnables; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.data.input.StringInputRowParser; @@ -77,8 +78,7 @@ public abstract class FileIteratingFirehose implements Firehose @Override public Runnable commit() { - // Do nothing. - return new Runnable() { public void run() {} }; + return Runnables.getNoopRunnable(); } @Override diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index 6dd11175ef6..f2a0659b9e9 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -19,16 +19,11 @@ package io.druid.cli; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Module; import com.google.inject.Provides; import com.metamx.common.logger.Logger; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.Jerseys; @@ -37,9 +32,6 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.LocalFirehoseFactory; -import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.coordinator.ForkingTaskRunner; import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.worker.Worker; @@ -47,17 +39,10 @@ import io.druid.indexing.worker.WorkerCuratorCoordinator; import io.druid.indexing.worker.WorkerTaskMonitor; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.indexing.worker.http.WorkerResource; -import io.druid.initialization.DruidModule; -import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; -import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; -import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.DruidNode; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Server; -import java.util.Arrays; import java.util.List; /** @@ -79,7 +64,7 @@ public class CliMiddleManager extends ServerRunnable protected List getModules() { return ImmutableList.of( - new DruidModule() + new Module() { @Override public void configure(Binder binder) @@ -112,28 +97,6 @@ public class CliMiddleManager extends ServerRunnable config.getVersion() ); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } } ); } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 3ea3c44a284..446283e0dee 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -19,8 +19,6 @@ package io.druid.cli; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Injector; @@ -30,10 +28,6 @@ import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; import com.google.inject.servlet.GuiceFilter; import com.metamx.common.logger.Logger; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.JacksonConfigProvider; @@ -47,9 +41,6 @@ import io.druid.guice.PolyBind; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.LocalFirehoseFactory; -import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogs; @@ -78,12 +69,6 @@ import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy; import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig; import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; import io.druid.indexing.coordinator.setup.WorkerSetupData; -import io.druid.initialization.DruidModule; -import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; -import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; -import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectInfo; import io.druid.server.initialization.JettyServerInitializer; @@ -99,7 +84,6 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlets.GzipFilter; import org.eclipse.jetty.util.resource.ResourceCollection; -import java.util.Arrays; import java.util.List; /** @@ -121,7 +105,7 @@ public class CliOverlord extends ServerRunnable protected List getModules() { return ImmutableList.of( - new DruidModule() + new Module() { @Override public void configure(Binder binder) @@ -215,28 +199,6 @@ public class CliOverlord extends ServerRunnable JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } } ); } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 3dcdfc38fe7..77e140bbd8f 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -105,7 +105,7 @@ public class CliPeon extends GuiceRunnable protected List getModules() { return ImmutableList.of( - new DruidModule() + new Module() { @Override public void configure(Binder binder) @@ -158,28 +158,6 @@ public class CliPeon extends GuiceRunnable LifecycleModule.register(binder, Server.class); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } } ); } diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 5b73be2bb51..07e862b1178 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -19,39 +19,23 @@ package io.druid.cli; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Module; import com.metamx.common.logger.Logger; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Command; import io.druid.client.DruidServer; import io.druid.client.InventoryView; import io.druid.client.ServerView; import io.druid.guice.NoopSegmentPublisherProvider; import io.druid.guice.RealtimeModule; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.LocalFirehoseFactory; -import io.druid.indexing.common.index.StaticS3FirehoseFactory; -import io.druid.initialization.DruidModule; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.SegmentPublisher; -import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; -import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; -import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.concurrent.Executor; @@ -75,7 +59,7 @@ public class CliRealtimeExample extends ServerRunnable { return ImmutableList.of( new RealtimeModule(), - new DruidModule() + new Module() { @Override public void configure(Binder binder) @@ -86,28 +70,6 @@ public class CliRealtimeExample extends ServerRunnable binder.bind(InventoryView.class).to(NoopInventoryView.class); binder.bind(ServerView.class).to(NoopServerView.class); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeExampleModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } } ); } diff --git a/services/src/main/java/io/druid/cli/Initialization.java b/services/src/main/java/io/druid/cli/Initialization.java index 76f502b1563..a3d687bb70c 100644 --- a/services/src/main/java/io/druid/cli/Initialization.java +++ b/services/src/main/java/io/druid/cli/Initialization.java @@ -40,6 +40,7 @@ import io.druid.guice.DataSegmentPusherPullerModule; import io.druid.guice.DbConnectorModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.DruidSecondaryModule; +import io.druid.guice.FirehoseModule; import io.druid.guice.HttpClientModule; import io.druid.guice.IndexingServiceDiscoveryModule; import io.druid.guice.JacksonConfigManagerModule; @@ -225,7 +226,8 @@ public class new JacksonConfigManagerModule(), new IndexingServiceDiscoveryModule(), new DataSegmentPusherPullerModule(), - new TaskLogsModule() + new TaskLogsModule(), + new FirehoseModule() ); ModuleList actualModules = new ModuleList(baseInjector); diff --git a/services/src/main/java/io/druid/guice/FirehoseModule.java b/services/src/main/java/io/druid/guice/FirehoseModule.java new file mode 100644 index 00000000000..5026d93e752 --- /dev/null +++ b/services/src/main/java/io/druid/guice/FirehoseModule.java @@ -0,0 +1,73 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import druid.examples.flights.FlightsFirehoseFactory; +import druid.examples.rand.RandomFirehoseFactory; +import druid.examples.twitter.TwitterSpritzerFirehoseFactory; +import druid.examples.web.WebFirehoseFactory; +import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.indexing.common.index.LocalFirehoseFactory; +import io.druid.indexing.common.index.StaticS3FirehoseFactory; +import io.druid.initialization.DruidModule; +import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.IrcFirehoseFactory; +import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; +import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; +import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; + +import java.util.Arrays; +import java.util.List; + +/** + */ +public class FirehoseModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("FirehoseModule") + .registerSubtypes( + new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), + new NamedType(FlightsFirehoseFactory.class, "flights"), + new NamedType(RandomFirehoseFactory.class, "rand"), + new NamedType(WebFirehoseFactory.class, "webstream"), + new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), + new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), + new NamedType(ClippedFirehoseFactory.class, "clipped"), + new NamedType(TimedShutoffFirehoseFactory.class, "timed"), + new NamedType(IrcFirehoseFactory.class, "irc"), + new NamedType(StaticS3FirehoseFactory.class, "s3"), + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(LocalFirehoseFactory.class, "local") + ) + ); + } +} diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index 195439662de..c4fa8aa6461 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -19,43 +19,26 @@ package io.druid.guice; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Key; +import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; import io.druid.cli.QueryJettyServerInitializer; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.FileIteratingFirehose; -import io.druid.indexing.common.index.LocalFirehoseFactory; -import io.druid.indexing.common.index.StaticS3FirehoseFactory; -import io.druid.initialization.DruidModule; import io.druid.query.QuerySegmentWalker; import io.druid.segment.realtime.DbSegmentPublisher; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.NoopSegmentPublisher; import io.druid.segment.realtime.RealtimeManager; import io.druid.segment.realtime.SegmentPublisher; -import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; -import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; -import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Server; -import java.util.Arrays; import java.util.List; /** */ -public class RealtimeModule implements DruidModule +public class RealtimeModule implements Module { @Override public void configure(Binder binder) @@ -66,7 +49,10 @@ public class RealtimeModule implements DruidModule Key.get(SegmentPublisher.class), Key.get(NoopSegmentPublisher.class) ); - final MapBinder publisherBinder = PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class)); + final MapBinder publisherBinder = PolyBind.optionBinder( + binder, + Key.get(SegmentPublisher.class) + ); publisherBinder.addBinding("db").to(DbSegmentPublisher.class); binder.bind(DbSegmentPublisher.class).in(LazySingleton.class); @@ -85,26 +71,4 @@ public class RealtimeModule implements DruidModule LifecycleModule.register(binder, Server.class); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } }