Merger: Replace EventReceivers with ChatHandlers

This allows task-related objects to create any http endpoint their little
hearts desire.
This commit is contained in:
Gian Merlino 2013-05-06 14:45:48 -07:00
parent b32a728863
commit d9b72e314e
12 changed files with 173 additions and 163 deletions

View File

@ -0,0 +1,6 @@
package com.metamx.druid.merger.common.index;
public interface ChatHandler
{
public String getHandlerId();
}

View File

@ -0,0 +1,83 @@
package com.metamx.druid.merger.common.index;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
import java.util.concurrent.ConcurrentMap;
/**
* Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method
* allows anyone with a reference to this object to obtain a particular {@link ChatHandler}. An embedded
* {@link ServiceAnnouncer} will be used to advertise handlers on this host.
*/
public class ChatHandlerProvider
{
private static final Logger log = new Logger(ChatHandlerProvider.class);
private final ChatHandlerProviderConfig config;
private final ServiceAnnouncer serviceAnnouncer;
private final ConcurrentMap<String, ChatHandler> handlers;
public ChatHandlerProvider(
ChatHandlerProviderConfig config,
ServiceAnnouncer serviceAnnouncer
)
{
this.config = config;
this.serviceAnnouncer = serviceAnnouncer;
this.handlers = Maps.newConcurrentMap();
}
public void register(final String key, ChatHandler handler)
{
final String service = serviceName(key);
log.info("Registering Eventhandler: %s", key);
if (handlers.putIfAbsent(key, handler) != null) {
throw new ISE("handler already registered for key: %s", key);
}
try {
serviceAnnouncer.announce(service);
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", service);
handlers.remove(key, handler);
}
}
public void unregister(final String key)
{
final String service = serviceName(key);
log.info("Unregistering chat handler: %s", key);
final ChatHandler handler = handlers.get(key);
if (handler == null) {
log.warn("handler not currently registered, ignoring: %s", key);
}
try {
serviceAnnouncer.unannounce(service);
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", service);
}
handlers.remove(key, handler);
}
public Optional<ChatHandler> get(final String key)
{
return Optional.fromNullable(handlers.get(key));
}
private String serviceName(String key)
{
return String.format(config.getServiceFormat(), key);
}
}

View File

@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.druid.indexer.data.MapInputRowParser;
import com.metamx.druid.input.InputRow;
@ -14,6 +15,10 @@ import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.emitter.EmittingLogger;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@ -25,7 +30,7 @@ import java.util.concurrent.TimeUnit;
/**
* Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these
* firehoses with an {@link EventReceiverProvider}.
* firehoses with an {@link ChatHandlerProvider}.
*/
@JsonTypeName("receiver")
public class EventReceiverFirehoseFactory implements FirehoseFactory
@ -36,20 +41,20 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
private final String firehoseId;
private final int bufferSize;
private final MapInputRowParser parser;
private final Optional<EventReceiverProvider> eventReceiverProvider;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
@JsonCreator
public EventReceiverFirehoseFactory(
@JsonProperty("firehoseId") String firehoseId,
@JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("parser") MapInputRowParser parser,
@JacksonInject("eventReceiverProvider") EventReceiverProvider eventReceiverProvider
@JacksonInject("chatHandlerProvider") ChatHandlerProvider chatHandlerProvider
)
{
this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId");
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.parser = Preconditions.checkNotNull(parser, "parser");
this.eventReceiverProvider = Optional.fromNullable(eventReceiverProvider);
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
}
@Override
@ -59,8 +64,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
final EventReceiverFirehose firehose = new EventReceiverFirehose();
if (eventReceiverProvider.isPresent()) {
eventReceiverProvider.get().register(firehoseId, firehose);
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().register(firehoseId, firehose);
}
return firehose;
@ -84,7 +89,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
return parser;
}
public class EventReceiverFirehose implements EventReceiver, Firehose
public class EventReceiverFirehose implements ChatHandler, Firehose
{
private final BlockingQueue<InputRow> buffer;
private final Object readLock = new Object();
@ -97,7 +102,15 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
}
@Override
public void addAll(Collection<Map<String, Object>> events)
public String getHandlerId()
{
return firehoseId;
}
@POST
@Path("/push-events")
@Produces("application/json")
public Response addAll(Collection<Map<String, Object>> events)
{
log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId);
@ -118,6 +131,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
throw new IllegalStateException("Cannot add events to closed firehose!");
}
}
return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
@ -176,8 +191,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
log.info("Firehose closing.");
closed = true;
if (eventReceiverProvider.isPresent()) {
eventReceiverProvider.get().unregister(firehoseId);
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(firehoseId);
}
}
}

View File

@ -1,84 +0,0 @@
package com.metamx.druid.merger.common.index;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig;
import org.apache.curator.x.discovery.ServiceDiscovery;
import java.util.concurrent.ConcurrentMap;
/**
* Provides a link between an {@link EventReceiver} and users. The {@link #get(String)} method allows anyone with a
* reference to this object to obtain an event receiver with a particular name. An embedded {@link ServiceDiscovery}
* instance, if provided, will be used to advertise event receivers on this host.
*/
public class EventReceiverProvider
{
private static final Logger log = new Logger(EventReceiverProvider.class);
private final EventReceiverProviderConfig config;
private final ServiceAnnouncer serviceAnnouncer;
private final ConcurrentMap<String, EventReceiver> receivers;
public EventReceiverProvider(
EventReceiverProviderConfig config,
ServiceAnnouncer serviceAnnouncer
)
{
this.config = config;
this.serviceAnnouncer = serviceAnnouncer;
this.receivers = Maps.newConcurrentMap();
}
public void register(final String key, EventReceiver receiver)
{
final String service = serviceName(key);
log.info("Registering EventReceiver: %s", key);
if (receivers.putIfAbsent(key, receiver) != null) {
throw new ISE("Receiver already registered for key: %s", key);
}
try {
serviceAnnouncer.announce(service);
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", service);
receivers.remove(key, receiver);
}
}
public void unregister(final String key)
{
final String service = serviceName(key);
log.info("Unregistering event receiver: %s", key);
final EventReceiver receiver = receivers.get(key);
if (receiver == null) {
log.warn("Receiver not currently registered, ignoring: %s", key);
}
try {
serviceAnnouncer.unannounce(service);
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", service);
}
receivers.remove(key, receiver);
}
public Optional<EventReceiver> get(final String key)
{
return Optional.fromNullable(receivers.get(key));
}
private String serviceName(String key)
{
return String.format(config.getServiceFormat(), key);
}
}

View File

@ -478,7 +478,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
injectables.addValue("s3Client", null)
.addValue("segmentPusher", null)
.addValue("eventReceiverProvider", null);
.addValue("chatHandlerProvider", null);
getJsonMapper().setInjectableValues(injectables);
}

View File

@ -3,9 +3,9 @@ package com.metamx.druid.merger.worker.config;
import org.skife.config.Config;
import org.skife.config.DefaultNull;
public abstract class EventReceiverProviderConfig
public abstract class ChatHandlerProviderConfig
{
@Config("druid.indexer.eventreceiver.service")
@Config("druid.indexer.chathandler.service")
@DefaultNull
public abstract String getServiceFormat();

View File

@ -0,0 +1,39 @@
package com.metamx.druid.merger.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.metamx.druid.merger.common.index.ChatHandler;
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
@Path("/mmx/worker/v1")
public class ChatHandlerResource
{
private final ObjectMapper jsonMapper;
private final ChatHandlerProvider handlers;
@Inject
public ChatHandlerResource(ObjectMapper jsonMapper, ChatHandlerProvider handlers)
{
this.jsonMapper = jsonMapper;
this.handlers = handlers;
}
@Path("/chat/{id}")
public Object doTaskChat(
@PathParam("id") String handlerId
)
{
final Optional<ChatHandler> handler = handlers.get(handlerId);
if (handler.isPresent()) {
return handler.get();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
}

View File

@ -1,48 +0,0 @@
package com.metamx.druid.merger.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.metamx.druid.merger.common.index.EventReceiver;
import com.metamx.druid.merger.common.index.EventReceiverProvider;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Map;
@Path("/mmx/worker/v1")
public class EventReceiverResource
{
private final ObjectMapper jsonMapper;
private final EventReceiverProvider receivers;
@Inject
public EventReceiverResource(ObjectMapper jsonMapper, EventReceiverProvider receivers)
{
this.jsonMapper = jsonMapper;
this.receivers = receivers;
}
@POST
@Path("/firehose/{id}/push-events")
@Produces("application/json")
public Response doPush(
@PathParam("id") String firehoseId,
List<Map<String, Object>> events
)
{
final Optional<EventReceiver> receiver = receivers.get(firehoseId);
if (receiver.isPresent()) {
receiver.get().addAll(events);
return Response.ok(ImmutableMap.of("eventCount", events.size())).build();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
}

View File

@ -11,7 +11,6 @@ import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.index.EventReceiverProvider;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.emitter.EmittingLogger;

View File

@ -57,10 +57,10 @@ import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.EventReceiverProvider;
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig;
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
@ -120,7 +120,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private Server server = null;
private ExecutorServiceTaskRunner taskRunner = null;
private ExecutorLifecycle executorLifecycle = null;
private EventReceiverProvider eventReceiverProvider = null;
private ChatHandlerProvider chatHandlerProvider = null;
public ExecutorNode(
String nodeType,
@ -194,7 +194,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
initializeDataSegmentPusher();
initializeTaskToolbox();
initializeTaskRunner();
initializeEventReceiverProvider();
initializeChatHandlerProvider();
initializeJacksonInjections();
initializeJacksonSubtypes();
initializeServer();
@ -212,7 +212,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
final Injector injector = Guice.createInjector(
new ExecutorServletModule(
getJsonMapper(),
eventReceiverProvider
chatHandlerProvider
)
);
final Context root = new Context(server, "/", Context.SESSIONS);
@ -289,7 +289,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", segmentPusher)
.addValue("eventReceiverProvider", eventReceiverProvider);
.addValue("chatHandlerProvider", chatHandlerProvider);
getJsonMapper().setInjectableValues(injectables);
}
@ -426,18 +426,18 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
}
}
public void initializeEventReceiverProvider()
public void initializeChatHandlerProvider()
{
if (eventReceiverProvider == null) {
final EventReceiverProviderConfig config = configFactory.build(EventReceiverProviderConfig.class);
if (chatHandlerProvider == null) {
final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class);
final ServiceAnnouncer myServiceAnnouncer;
if (config.getServiceFormat() == null) {
log.info("EventReceiverProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!");
log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!");
myServiceAnnouncer = new NoopServiceAnnouncer();
} else {
myServiceAnnouncer = serviceAnnouncer;
}
this.eventReceiverProvider = new EventReceiverProvider(
this.chatHandlerProvider = new ChatHandlerProvider(
config,
myServiceAnnouncer
);

View File

@ -3,7 +3,7 @@ package com.metamx.druid.merger.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.merger.common.index.EventReceiverProvider;
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
@ -12,11 +12,11 @@ import javax.inject.Singleton;
public class ExecutorServletModule extends JerseyServletModule
{
private final ObjectMapper jsonMapper;
private final EventReceiverProvider receivers;
private final ChatHandlerProvider receivers;
public ExecutorServletModule(
ObjectMapper jsonMapper,
EventReceiverProvider receivers
ChatHandlerProvider receivers
)
{
this.jsonMapper = jsonMapper;
@ -26,9 +26,9 @@ public class ExecutorServletModule extends JerseyServletModule
@Override
protected void configureServlets()
{
bind(EventReceiverResource.class);
bind(ChatHandlerResource.class);
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(EventReceiverProvider.class).toInstance(receivers);
bind(ChatHandlerProvider.class).toInstance(receivers);
serve("/*").with(GuiceContainer.class);
}

View File

@ -267,7 +267,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
injectables.addValue("s3Client", null)
.addValue("segmentPusher", null)
.addValue("eventReceiverProvider", null);
.addValue("chatHandlerProvider", null);
getJsonMapper().setInjectableValues(injectables);
}