From d9b72e314e41c8885fdb7a18999ba56c7af5d44b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 6 May 2013 14:45:48 -0700 Subject: [PATCH] Merger: Replace EventReceivers with ChatHandlers This allows task-related objects to create any http endpoint their little hearts desire. --- .../merger/common/index/ChatHandler.java | 6 ++ .../common/index/ChatHandlerProvider.java | 83 ++++++++++++++++++ .../index/EventReceiverFirehoseFactory.java | 35 +++++--- .../common/index/EventReceiverProvider.java | 84 ------------------- .../http/IndexerCoordinatorNode.java | 2 +- ...ig.java => ChatHandlerProviderConfig.java} | 4 +- .../worker/executor/ChatHandlerResource.java | 39 +++++++++ .../executor/EventReceiverResource.java | 48 ----------- .../worker/executor/ExecutorLifecycle.java | 1 - .../merger/worker/executor/ExecutorNode.java | 22 ++--- .../executor/ExecutorServletModule.java | 10 +-- .../druid/merger/worker/http/WorkerNode.java | 2 +- 12 files changed, 173 insertions(+), 163 deletions(-) create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/index/ChatHandler.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/index/ChatHandlerProvider.java delete mode 100644 merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java rename merger/src/main/java/com/metamx/druid/merger/worker/config/{EventReceiverProviderConfig.java => ChatHandlerProviderConfig.java} (75%) create mode 100644 merger/src/main/java/com/metamx/druid/merger/worker/executor/ChatHandlerResource.java delete mode 100644 merger/src/main/java/com/metamx/druid/merger/worker/executor/EventReceiverResource.java diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/ChatHandler.java b/merger/src/main/java/com/metamx/druid/merger/common/index/ChatHandler.java new file mode 100644 index 00000000000..59269e6d137 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/ChatHandler.java @@ -0,0 +1,6 @@ +package com.metamx.druid.merger.common.index; + +public interface ChatHandler +{ + public String getHandlerId(); +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/ChatHandlerProvider.java b/merger/src/main/java/com/metamx/druid/merger/common/index/ChatHandlerProvider.java new file mode 100644 index 00000000000..53ce944f92f --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/ChatHandlerProvider.java @@ -0,0 +1,83 @@ +package com.metamx.druid.merger.common.index; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.metamx.druid.curator.discovery.ServiceAnnouncer; +import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig; + +import java.util.concurrent.ConcurrentMap; + +/** + * Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method + * allows anyone with a reference to this object to obtain a particular {@link ChatHandler}. An embedded + * {@link ServiceAnnouncer} will be used to advertise handlers on this host. + */ +public class ChatHandlerProvider +{ + private static final Logger log = new Logger(ChatHandlerProvider.class); + + private final ChatHandlerProviderConfig config; + private final ServiceAnnouncer serviceAnnouncer; + private final ConcurrentMap handlers; + + public ChatHandlerProvider( + ChatHandlerProviderConfig config, + ServiceAnnouncer serviceAnnouncer + ) + { + this.config = config; + this.serviceAnnouncer = serviceAnnouncer; + this.handlers = Maps.newConcurrentMap(); + } + + public void register(final String key, ChatHandler handler) + { + final String service = serviceName(key); + log.info("Registering Eventhandler: %s", key); + + if (handlers.putIfAbsent(key, handler) != null) { + throw new ISE("handler already registered for key: %s", key); + } + + try { + serviceAnnouncer.announce(service); + } + catch (Exception e) { + log.warn(e, "Failed to register service: %s", service); + handlers.remove(key, handler); + } + } + + public void unregister(final String key) + { + final String service = serviceName(key); + + log.info("Unregistering chat handler: %s", key); + + final ChatHandler handler = handlers.get(key); + if (handler == null) { + log.warn("handler not currently registered, ignoring: %s", key); + } + + try { + serviceAnnouncer.unannounce(service); + } + catch (Exception e) { + log.warn(e, "Failed to unregister service: %s", service); + } + + handlers.remove(key, handler); + } + + public Optional get(final String key) + { + return Optional.fromNullable(handlers.get(key)); + } + + private String serviceName(String key) + { + return String.format(config.getServiceFormat(), key); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java index d2376e5d0c2..4f3271ca6bc 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.druid.indexer.data.MapInputRowParser; import com.metamx.druid.input.InputRow; @@ -14,6 +15,10 @@ import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.emitter.EmittingLogger; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Response; import java.io.IOException; import java.util.Collection; import java.util.List; @@ -25,7 +30,7 @@ import java.util.concurrent.TimeUnit; /** * Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these - * firehoses with an {@link EventReceiverProvider}. + * firehoses with an {@link ChatHandlerProvider}. */ @JsonTypeName("receiver") public class EventReceiverFirehoseFactory implements FirehoseFactory @@ -36,20 +41,20 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory private final String firehoseId; private final int bufferSize; private final MapInputRowParser parser; - private final Optional eventReceiverProvider; + private final Optional chatHandlerProvider; @JsonCreator public EventReceiverFirehoseFactory( @JsonProperty("firehoseId") String firehoseId, @JsonProperty("bufferSize") Integer bufferSize, @JsonProperty("parser") MapInputRowParser parser, - @JacksonInject("eventReceiverProvider") EventReceiverProvider eventReceiverProvider + @JacksonInject("chatHandlerProvider") ChatHandlerProvider chatHandlerProvider ) { this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId"); this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize; this.parser = Preconditions.checkNotNull(parser, "parser"); - this.eventReceiverProvider = Optional.fromNullable(eventReceiverProvider); + this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); } @Override @@ -59,8 +64,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory final EventReceiverFirehose firehose = new EventReceiverFirehose(); - if (eventReceiverProvider.isPresent()) { - eventReceiverProvider.get().register(firehoseId, firehose); + if (chatHandlerProvider.isPresent()) { + chatHandlerProvider.get().register(firehoseId, firehose); } return firehose; @@ -84,7 +89,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory return parser; } - public class EventReceiverFirehose implements EventReceiver, Firehose + public class EventReceiverFirehose implements ChatHandler, Firehose { private final BlockingQueue buffer; private final Object readLock = new Object(); @@ -97,7 +102,15 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory } @Override - public void addAll(Collection> events) + public String getHandlerId() + { + return firehoseId; + } + + @POST + @Path("/push-events") + @Produces("application/json") + public Response addAll(Collection> events) { log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId); @@ -118,6 +131,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory throw new IllegalStateException("Cannot add events to closed firehose!"); } } + + return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Throwables.propagate(e); @@ -176,8 +191,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory log.info("Firehose closing."); closed = true; - if (eventReceiverProvider.isPresent()) { - eventReceiverProvider.get().unregister(firehoseId); + if (chatHandlerProvider.isPresent()) { + chatHandlerProvider.get().unregister(firehoseId); } } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java deleted file mode 100644 index ce46a7b37dd..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.metamx.druid.merger.common.index; - -import com.google.common.base.Optional; -import com.google.common.collect.Maps; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; -import com.metamx.druid.curator.discovery.ServiceAnnouncer; -import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig; -import org.apache.curator.x.discovery.ServiceDiscovery; - -import java.util.concurrent.ConcurrentMap; - -/** - * Provides a link between an {@link EventReceiver} and users. The {@link #get(String)} method allows anyone with a - * reference to this object to obtain an event receiver with a particular name. An embedded {@link ServiceDiscovery} - * instance, if provided, will be used to advertise event receivers on this host. - */ -public class EventReceiverProvider -{ - private static final Logger log = new Logger(EventReceiverProvider.class); - - private final EventReceiverProviderConfig config; - private final ServiceAnnouncer serviceAnnouncer; - private final ConcurrentMap receivers; - - public EventReceiverProvider( - EventReceiverProviderConfig config, - ServiceAnnouncer serviceAnnouncer - ) - { - this.config = config; - this.serviceAnnouncer = serviceAnnouncer; - this.receivers = Maps.newConcurrentMap(); - } - - public void register(final String key, EventReceiver receiver) - { - final String service = serviceName(key); - log.info("Registering EventReceiver: %s", key); - - if (receivers.putIfAbsent(key, receiver) != null) { - throw new ISE("Receiver already registered for key: %s", key); - } - - try { - serviceAnnouncer.announce(service); - } - catch (Exception e) { - log.warn(e, "Failed to register service: %s", service); - receivers.remove(key, receiver); - } - } - - public void unregister(final String key) - { - final String service = serviceName(key); - - log.info("Unregistering event receiver: %s", key); - - final EventReceiver receiver = receivers.get(key); - if (receiver == null) { - log.warn("Receiver not currently registered, ignoring: %s", key); - } - - try { - serviceAnnouncer.unannounce(service); - } - catch (Exception e) { - log.warn(e, "Failed to unregister service: %s", service); - } - - receivers.remove(key, receiver); - } - - public Optional get(final String key) - { - return Optional.fromNullable(receivers.get(key)); - } - - private String serviceName(String key) - { - return String.format(config.getServiceFormat(), key); - } -} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 8a854499cfa..8a2923431ec 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -478,7 +478,7 @@ public class IndexerCoordinatorNode extends QueryableNode handler = handlers.get(handlerId); + + if (handler.isPresent()) { + return handler.get(); + } else { + return Response.status(Response.Status.NOT_FOUND).build(); + } + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/executor/EventReceiverResource.java b/merger/src/main/java/com/metamx/druid/merger/worker/executor/EventReceiverResource.java deleted file mode 100644 index 716fed0962e..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/worker/executor/EventReceiverResource.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.metamx.druid.merger.worker.executor; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.inject.Inject; -import com.metamx.druid.merger.common.index.EventReceiver; -import com.metamx.druid.merger.common.index.EventReceiverProvider; - -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.core.Response; -import java.util.List; -import java.util.Map; - -@Path("/mmx/worker/v1") -public class EventReceiverResource -{ - private final ObjectMapper jsonMapper; - private final EventReceiverProvider receivers; - - @Inject - public EventReceiverResource(ObjectMapper jsonMapper, EventReceiverProvider receivers) - { - this.jsonMapper = jsonMapper; - this.receivers = receivers; - } - - @POST - @Path("/firehose/{id}/push-events") - @Produces("application/json") - public Response doPush( - @PathParam("id") String firehoseId, - List> events - ) - { - final Optional receiver = receivers.get(firehoseId); - - if (receiver.isPresent()) { - receiver.get().addAll(events); - return Response.ok(ImmutableMap.of("eventCount", events.size())).build(); - } else { - return Response.status(Response.Status.NOT_FOUND).build(); - } - } -} diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorLifecycle.java b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorLifecycle.java index 28dbe7b41c1..778021e7b42 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorLifecycle.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorLifecycle.java @@ -11,7 +11,6 @@ import com.metamx.common.ISE; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.merger.common.TaskStatus; -import com.metamx.druid.merger.common.index.EventReceiverProvider; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskRunner; import com.metamx.emitter.EmittingLogger; diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java index 59d7ad2a1b9..66ee23eb0df 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java @@ -57,10 +57,10 @@ import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory; import com.metamx.druid.merger.common.config.RetryPolicyConfig; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory; -import com.metamx.druid.merger.common.index.EventReceiverProvider; +import com.metamx.druid.merger.common.index.ChatHandlerProvider; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner; -import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig; +import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig; import com.metamx.druid.merger.worker.config.WorkerConfig; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; @@ -120,7 +120,7 @@ public class ExecutorNode extends BaseServerNode private Server server = null; private ExecutorServiceTaskRunner taskRunner = null; private ExecutorLifecycle executorLifecycle = null; - private EventReceiverProvider eventReceiverProvider = null; + private ChatHandlerProvider chatHandlerProvider = null; public ExecutorNode( String nodeType, @@ -194,7 +194,7 @@ public class ExecutorNode extends BaseServerNode initializeDataSegmentPusher(); initializeTaskToolbox(); initializeTaskRunner(); - initializeEventReceiverProvider(); + initializeChatHandlerProvider(); initializeJacksonInjections(); initializeJacksonSubtypes(); initializeServer(); @@ -212,7 +212,7 @@ public class ExecutorNode extends BaseServerNode final Injector injector = Guice.createInjector( new ExecutorServletModule( getJsonMapper(), - eventReceiverProvider + chatHandlerProvider ) ); final Context root = new Context(server, "/", Context.SESSIONS); @@ -289,7 +289,7 @@ public class ExecutorNode extends BaseServerNode injectables.addValue("s3Client", s3Service) .addValue("segmentPusher", segmentPusher) - .addValue("eventReceiverProvider", eventReceiverProvider); + .addValue("chatHandlerProvider", chatHandlerProvider); getJsonMapper().setInjectableValues(injectables); } @@ -426,18 +426,18 @@ public class ExecutorNode extends BaseServerNode } } - public void initializeEventReceiverProvider() + public void initializeChatHandlerProvider() { - if (eventReceiverProvider == null) { - final EventReceiverProviderConfig config = configFactory.build(EventReceiverProviderConfig.class); + if (chatHandlerProvider == null) { + final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class); final ServiceAnnouncer myServiceAnnouncer; if (config.getServiceFormat() == null) { - log.info("EventReceiverProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!"); + log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!"); myServiceAnnouncer = new NoopServiceAnnouncer(); } else { myServiceAnnouncer = serviceAnnouncer; } - this.eventReceiverProvider = new EventReceiverProvider( + this.chatHandlerProvider = new ChatHandlerProvider( config, myServiceAnnouncer ); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorServletModule.java b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorServletModule.java index 72ad3171450..fa6ea39f2ba 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorServletModule.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorServletModule.java @@ -3,7 +3,7 @@ package com.metamx.druid.merger.worker.executor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.google.inject.Provides; -import com.metamx.druid.merger.common.index.EventReceiverProvider; +import com.metamx.druid.merger.common.index.ChatHandlerProvider; import com.sun.jersey.guice.JerseyServletModule; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; @@ -12,11 +12,11 @@ import javax.inject.Singleton; public class ExecutorServletModule extends JerseyServletModule { private final ObjectMapper jsonMapper; - private final EventReceiverProvider receivers; + private final ChatHandlerProvider receivers; public ExecutorServletModule( ObjectMapper jsonMapper, - EventReceiverProvider receivers + ChatHandlerProvider receivers ) { this.jsonMapper = jsonMapper; @@ -26,9 +26,9 @@ public class ExecutorServletModule extends JerseyServletModule @Override protected void configureServlets() { - bind(EventReceiverResource.class); + bind(ChatHandlerResource.class); bind(ObjectMapper.class).toInstance(jsonMapper); - bind(EventReceiverProvider.class).toInstance(receivers); + bind(ChatHandlerProvider.class).toInstance(receivers); serve("/*").with(GuiceContainer.class); } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 10fb15755f2..9ef0a900de6 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -267,7 +267,7 @@ public class WorkerNode extends QueryableNode injectables.addValue("s3Client", null) .addValue("segmentPusher", null) - .addValue("eventReceiverProvider", null); + .addValue("chatHandlerProvider", null); getJsonMapper().setInjectableValues(injectables); }