diff --git a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java index ed9f628452c..b8609453f3d 100644 --- a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java +++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.initialization.DruidModule; import java.util.List; diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java index aacc217e208..ac10c297dd4 100644 --- a/server/src/main/java/io/druid/guice/FirehoseModule.java +++ b/server/src/main/java/io/druid/guice/FirehoseModule.java @@ -26,6 +26,7 @@ import com.google.inject.Binder; import io.druid.data.input.ProtoBufInputRowParser; import io.druid.initialization.DruidModule; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; @@ -52,9 +53,8 @@ public class FirehoseModule implements DruidModule new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(LocalFirehoseFactory.class, "local"), - new NamedType(ProtoBufInputRowParser.class, "protobuf") + new NamedType(EventReceiverFirehoseFactory.class, "receiver") ) ); } - } diff --git a/server/src/main/java/io/druid/guice/ParsersModule.java b/server/src/main/java/io/druid/guice/ParsersModule.java new file mode 100644 index 00000000000..7c9d76c30e1 --- /dev/null +++ b/server/src/main/java/io/druid/guice/ParsersModule.java @@ -0,0 +1,51 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.data.input.ProtoBufInputRowParser; +import io.druid.initialization.DruidModule; + +import java.util.Arrays; +import java.util.List; + +/** + */ +public class ParsersModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("ParsersModule") + .registerSubtypes( + new NamedType(ProtoBufInputRowParser.class, "protobuf") + ) + ); + } +} diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index 66748fdf5d0..292aa3c95cf 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -47,6 +47,7 @@ import io.druid.guice.JacksonConfigManagerModule; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LifecycleModule; import io.druid.guice.LocalDataStorageDruidModule; +import io.druid.guice.ParsersModule; import io.druid.guice.QueryRunnerFactoryModule; import io.druid.guice.QueryableModule; import io.druid.guice.ServerModule; @@ -327,7 +328,8 @@ public class Initialization new JacksonConfigManagerModule(), new IndexingServiceDiscoveryModule(), new LocalDataStorageDruidModule(), - new FirehoseModule() + new FirehoseModule(), + new ParsersModule() ); ModuleList actualModules = new ModuleList(baseInjector); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandler.java b/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandler.java similarity index 96% rename from indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandler.java rename to server/src/main/java/io/druid/segment/realtime/firehose/ChatHandler.java index f10ed444b9a..898bc5cb25e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandler.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandler.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; /** * Objects that can be registered with a {@link ServiceAnnouncingChatHandlerProvider} and provide http endpoints for indexing-related diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java b/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerProvider.java similarity index 96% rename from indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java rename to server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerProvider.java index 01af5c3cc62..f8ba66af1af 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerProvider.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; import com.google.common.base.Optional; diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java b/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerResource.java similarity index 90% rename from indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java rename to server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerResource.java index c52745c5d58..a1d26b56dfd 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerResource.java @@ -17,12 +17,12 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.worker.executor; +package io.druid.segment.realtime.firehose; import com.google.common.base.Optional; import com.google.inject.Inject; -import io.druid.indexing.common.index.ChatHandler; -import io.druid.indexing.common.index.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; import javax.ws.rs.Path; import javax.ws.rs.PathParam; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiver.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiver.java similarity index 95% rename from indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiver.java rename to server/src/main/java/io/druid/segment/realtime/firehose/EventReceiver.java index 9454fa7c2f4..491f7d37f7a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiver.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiver.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; import java.util.Collection; import java.util.Map; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java similarity index 91% rename from indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java rename to server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index 31716232207..858a1aaf84c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; @@ -32,7 +32,10 @@ import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; +import io.druid.data.input.Rows; import io.druid.data.input.impl.MapInputRowParser; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -48,10 +51,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** - * Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these - * firehoses with an {@link ServiceAnnouncingChatHandlerProvider}. + * Builds firehoses that accept events through the {@link io.druid.segment.realtime.firehose.EventReceiver} interface. Can also register these + * firehoses with an {@link io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider}. */ -@JsonTypeName("receiver") public class EventReceiverFirehoseFactory implements FirehoseFactory { private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class); @@ -138,7 +140,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory final List rows = Lists.newArrayList(); for (final Map event : events) { // Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer. - rows.add(parser.parse(event)); + InputRow row = parser.parse(event); + rows.add(Rows.toCaseInsensitiveInputRow(row,row.getDimensions())); } try { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java b/server/src/main/java/io/druid/segment/realtime/firehose/NoopChatHandlerProvider.java similarity index 96% rename from indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java rename to server/src/main/java/io/druid/segment/realtime/firehose/NoopChatHandlerProvider.java index e75cce88e28..ae4bd3cbb95 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/NoopChatHandlerProvider.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; import com.google.common.base.Optional; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/ServiceAnnouncingChatHandlerProvider.java b/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java similarity index 98% rename from indexing-service/src/main/java/io/druid/indexing/common/index/ServiceAnnouncingChatHandlerProvider.java rename to server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java index 5b24ac94943..c96e47ccb31 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/ServiceAnnouncingChatHandlerProvider.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; import com.google.common.base.Optional; import com.google.common.collect.Maps; diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index d0867b42aa6..e3356012358 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -35,7 +35,7 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; -import io.druid.indexing.common.index.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.indexing.overlord.ForkingTaskRunner; import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.worker.Worker; diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 9bfb75f519d..9f95feb43ad 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -45,7 +45,7 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskStorageConfig; -import io.druid.indexing.common.index.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; import io.druid.indexing.overlord.DbTaskStorage; diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 7204e5fc63a..d4eb0228c9a 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -48,15 +48,15 @@ import io.druid.indexing.common.actions.RemoteTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; -import io.druid.indexing.common.index.ChatHandlerProvider; -import io.druid.indexing.common.index.NoopChatHandlerProvider; -import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerDBCoordinator; import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.ThreadPoolTaskRunner; -import io.druid.indexing.worker.executor.ChatHandlerResource; +import io.druid.segment.realtime.firehose.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.query.QuerySegmentWalker; diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index 276f850049a..7bcce15823a 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -31,6 +31,10 @@ import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.NoopSegmentPublisher; import io.druid.segment.realtime.RealtimeManager; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; import io.druid.server.QueryResource; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Server; @@ -57,6 +61,20 @@ public class RealtimeModule implements Module publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class); binder.bind(DbSegmentPublisher.class).in(LazySingleton.class); + PolyBind.createChoice( + binder, + "druid.realtime.chathandler.type", + Key.get(ChatHandlerProvider.class), + Key.get(NoopChatHandlerProvider.class) + ); + final MapBinder handlerProviderBinder = PolyBind.optionBinder( + binder, Key.get(ChatHandlerProvider.class) + ); + handlerProviderBinder.addBinding("announce") + .to(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class); + handlerProviderBinder.addBinding("noop") + .to(NoopChatHandlerProvider.class).in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); binder.bind(new TypeLiteral>(){}) .toProvider(FireDepartmentsProvider.class) @@ -66,6 +84,7 @@ public class RealtimeModule implements Module binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime")); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, ChatHandlerResource.class); LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, Server.class);