Merge branch 'move-firehose' of github.com:metamx/druid into move-firehose

This commit is contained in:
fjy 2014-04-25 14:13:19 -07:00
commit 171d20d52d
4 changed files with 24 additions and 3 deletions

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.worker.executor; package io.druid.segment.realtime.firehose;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.inject.Inject; import com.google.inject.Inject;

View File

@ -32,6 +32,7 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.MapInputRowParser;
import io.druid.segment.realtime.firehose.ChatHandler; import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.segment.realtime.firehose.ChatHandlerProvider;
@ -139,7 +140,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
final List<InputRow> rows = Lists.newArrayList(); final List<InputRow> rows = Lists.newArrayList();
for (final Map<String, Object> event : events) { for (final Map<String, Object> event : events) {
// Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer. // Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
rows.add(parser.parse(event)); InputRow row = parser.parse(event);
rows.add(Rows.toCaseInsensitiveInputRow(row,row.getDimensions()));
} }
try { try {

View File

@ -56,7 +56,7 @@ import io.druid.indexing.overlord.IndexerDBCoordinator;
import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.segment.realtime.firehose.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;

View File

@ -31,6 +31,10 @@ import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.NoopSegmentPublisher; import io.druid.segment.realtime.NoopSegmentPublisher;
import io.druid.segment.realtime.RealtimeManager; import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.ChatHandlerResource;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -57,6 +61,20 @@ public class RealtimeModule implements Module
publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class); publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class);
binder.bind(DbSegmentPublisher.class).in(LazySingleton.class); binder.bind(DbSegmentPublisher.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.realtime.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(NoopChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(ChatHandlerProvider.class)
);
handlerProviderBinder.addBinding("announce")
.to(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
handlerProviderBinder.addBinding("noop")
.to(NoopChatHandlerProvider.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(new TypeLiteral<List<FireDepartment>>(){}) binder.bind(new TypeLiteral<List<FireDepartment>>(){})
.toProvider(FireDepartmentsProvider.class) .toProvider(FireDepartmentsProvider.class)
@ -66,6 +84,7 @@ public class RealtimeModule implements Module
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime")); binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime"));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, ChatHandlerResource.class);
LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, Server.class);