Merge pull request #459 from metamx/move-firehose

Move the event receiver firehose such that standalone realtime nodes can use it
This commit is contained in:
fjy 2014-05-05 17:09:43 -06:00
commit 544c0f08f6
15 changed files with 98 additions and 23 deletions

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.initialization.DruidModule;
import java.util.List;

View File

@ -26,6 +26,7 @@ import com.google.inject.Binder;
import io.druid.data.input.ProtoBufInputRowParser;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
@ -52,9 +53,8 @@ public class FirehoseModule implements DruidModule
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(LocalFirehoseFactory.class, "local"),
new NamedType(ProtoBufInputRowParser.class, "protobuf")
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
)
);
}
}

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.data.input.ProtoBufInputRowParser;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
/**
*/
public class ParsersModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
}
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("ParsersModule")
.registerSubtypes(
new NamedType(ProtoBufInputRowParser.class, "protobuf")
)
);
}
}

View File

@ -47,6 +47,7 @@ import io.druid.guice.JacksonConfigManagerModule;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LifecycleModule;
import io.druid.guice.LocalDataStorageDruidModule;
import io.druid.guice.ParsersModule;
import io.druid.guice.QueryRunnerFactoryModule;
import io.druid.guice.QueryableModule;
import io.druid.guice.ServerModule;
@ -327,7 +328,8 @@ public class Initialization
new JacksonConfigManagerModule(),
new IndexingServiceDiscoveryModule(),
new LocalDataStorageDruidModule(),
new FirehoseModule()
new FirehoseModule(),
new ParsersModule()
);
ModuleList actualModules = new ModuleList(baseInjector);

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.index;
package io.druid.segment.realtime.firehose;
/**
* Objects that can be registered with a {@link ServiceAnnouncingChatHandlerProvider} and provide http endpoints for indexing-related

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.index;
package io.druid.segment.realtime.firehose;
import com.google.common.base.Optional;

View File

@ -17,12 +17,12 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.worker.executor;
package io.druid.segment.realtime.firehose;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import io.druid.indexing.common.index.ChatHandler;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.index;
package io.druid.segment.realtime.firehose;
import java.util.Collection;
import java.util.Map;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.index;
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@ -32,7 +32,10 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@ -48,10 +51,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these
* firehoses with an {@link ServiceAnnouncingChatHandlerProvider}.
* Builds firehoses that accept events through the {@link io.druid.segment.realtime.firehose.EventReceiver} interface. Can also register these
* firehoses with an {@link io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider}.
*/
@JsonTypeName("receiver")
public class EventReceiverFirehoseFactory implements FirehoseFactory
{
private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
@ -138,7 +140,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
final List<InputRow> rows = Lists.newArrayList();
for (final Map<String, Object> event : events) {
// Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
rows.add(parser.parse(event));
InputRow row = parser.parse(event);
rows.add(Rows.toCaseInsensitiveInputRow(row,row.getDimensions()));
}
try {

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.index;
package io.druid.segment.realtime.firehose;
import com.google.common.base.Optional;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.index;
package io.druid.segment.realtime.firehose;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;

View File

@ -35,7 +35,7 @@ import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.indexing.overlord.ForkingTaskRunner;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.worker.Worker;

View File

@ -45,7 +45,7 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import io.druid.indexing.overlord.DbTaskStorage;

View File

@ -48,15 +48,15 @@ import io.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.index.NoopChatHandlerProvider;
import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerDBCoordinator;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.segment.realtime.firehose.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.query.QuerySegmentWalker;

View File

@ -31,6 +31,10 @@ import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.NoopSegmentPublisher;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.ChatHandlerResource;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.server.QueryResource;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
@ -57,6 +61,20 @@ public class RealtimeModule implements Module
publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class);
binder.bind(DbSegmentPublisher.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.realtime.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(NoopChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(ChatHandlerProvider.class)
);
handlerProviderBinder.addBinding("announce")
.to(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
handlerProviderBinder.addBinding("noop")
.to(NoopChatHandlerProvider.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(new TypeLiteral<List<FireDepartment>>(){})
.toProvider(FireDepartmentsProvider.class)
@ -66,6 +84,7 @@ public class RealtimeModule implements Module
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime"));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, ChatHandlerResource.class);
LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, Server.class);