From 5d0d71250b68845966d89e89128146e6c234070f Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 1 Oct 2013 11:25:39 -0700 Subject: [PATCH] fix chat handler resources not correctly registering themselves --- .../common/index/ChatHandlerProvider.java | 2 ++ .../index/EventReceiverFirehoseFactory.java | 32 ++++++++++------- .../EventReceivingChatHandlerProvider.java | 7 +++- .../common/index/NoopChatHandlerProvider.java | 6 ++++ .../druid/indexing/common/task/NoopTask.java | 34 ++++++++++++++++--- .../coordinator/RemoteTaskRunner.java | 2 +- .../worker/executor/ChatHandlerResource.java | 2 ++ .../java/io/druid/cli/CliMiddleManager.java | 4 +++ .../main/java/io/druid/cli/CliOverlord.java | 9 ++++- .../src/main/java/io/druid/cli/CliPeon.java | 6 ++-- 10 files changed, 82 insertions(+), 22 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java b/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java index 01af5c3cc62..49e74df0717 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java @@ -25,6 +25,8 @@ import com.google.common.base.Optional; */ public interface ChatHandlerProvider { + public String getType(); + public void register(final String key, ChatHandler handler); public void unregister(final String key); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java index f26e0090953..8e66fd8f357 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java @@ -57,20 +57,20 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class); private static final int DEFAULT_BUFFER_SIZE = 100000; - private final String firehoseId; + private final String serviceName; private final int bufferSize; private final MapInputRowParser parser; - private final Optional chatHandlerProvider; + private final Optional chatHandlerProvider; @JsonCreator public EventReceiverFirehoseFactory( - @JsonProperty("firehoseId") String firehoseId, + @JsonProperty("serviceName") String serviceName, @JsonProperty("bufferSize") Integer bufferSize, @JsonProperty("parser") MapInputRowParser parser, - @JacksonInject("chatHandlerProvider") EventReceivingChatHandlerProvider chatHandlerProvider + @JacksonInject ChatHandlerProvider chatHandlerProvider ) { - this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId"); + this.serviceName = Preconditions.checkNotNull(serviceName, "serviceName"); this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize; this.parser = Preconditions.checkNotNull(parser, "parser"); this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); @@ -79,21 +79,24 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { - log.info("Connecting firehose: %s", firehoseId); + log.info("Connecting firehose: %s", serviceName); final EventReceiverFirehose firehose = new EventReceiverFirehose(); if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().register(firehoseId, firehose); + log.info("Found chathandler with type[%s]", chatHandlerProvider.get().getType()); + chatHandlerProvider.get().register(serviceName, firehose); + } else { + log.info("No chathandler detected"); } return firehose; } @JsonProperty - public String getFirehoseId() + public String getServiceName() { - return firehoseId; + return serviceName; } @JsonProperty @@ -111,7 +114,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory public class EventReceiverFirehose implements ChatHandler, Firehose { private final BlockingQueue buffer; + private final Object readLock = new Object(); + private volatile InputRow nextRow = null; private volatile boolean closed = false; @@ -125,7 +130,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory @Produces("application/json") public Response addAll(Collection> events) { - log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId); + log.debug("Adding %,d events to firehose: %s", events.size(), serviceName); final List rows = Lists.newArrayList(); for (final Map event : events) { @@ -146,7 +151,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory } return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build(); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Throwables.propagate(e); } @@ -167,7 +173,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory } return nextRow != null; - } + } } @Override @@ -205,7 +211,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory closed = true; if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().unregister(firehoseId); + chatHandlerProvider.get().unregister(serviceName); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceivingChatHandlerProvider.java b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceivingChatHandlerProvider.java index 8fce0a3935a..93bdd2a53c6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceivingChatHandlerProvider.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceivingChatHandlerProvider.java @@ -54,6 +54,12 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider this.handlers = Maps.newConcurrentMap(); } + @Override + public String getType() + { + return "eventReceiving"; + } + @Override public void register(final String service, ChatHandler handler) { @@ -76,7 +82,6 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider @Override public void unregister(final String service) { - log.info("Unregistering chat handler[%s]", service); final ChatHandler handler = handlers.get(service); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java b/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java index e75cce88e28..b0b2d9069a7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java @@ -25,6 +25,12 @@ import com.google.common.base.Optional; */ public class NoopChatHandlerProvider implements ChatHandlerProvider { + @Override + public String getType() + { + return "noop"; + } + @Override public void register(String key, ChatHandler handler) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index 230f074db72..c6291fdb4c9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -22,6 +22,7 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.metamx.common.logger.Logger; +import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import org.joda.time.DateTime; @@ -33,11 +34,17 @@ import org.joda.time.Period; public class NoopTask extends AbstractTask { private static final Logger log = new Logger(NoopTask.class); + private static int defaultRunTime = 2500; + + private final int runTime; + private final FirehoseFactory firehoseFactory; @JsonCreator public NoopTask( @JsonProperty("id") String id, - @JsonProperty("interval") Interval interval + @JsonProperty("interval") Interval interval, + @JsonProperty("runTime") int runTime, + @JsonProperty("firehose") FirehoseFactory firehoseFactory ) { super( @@ -45,6 +52,10 @@ public class NoopTask extends AbstractTask "none", interval == null ? new Interval(Period.days(1), new DateTime()) : interval ); + + this.runTime = (runTime == 0) ? defaultRunTime : runTime; + + this.firehoseFactory = firehoseFactory; } @Override @@ -53,14 +64,29 @@ public class NoopTask extends AbstractTask return "noop"; } + @JsonProperty("runTime") + public int getRunTime() + { + return runTime; + } + + @JsonProperty("firehose") + public FirehoseFactory getFirehoseFactory() + { + return firehoseFactory; + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final int sleepTime = 2500; + if (firehoseFactory != null) { + log.info("Connecting firehose"); + firehoseFactory.connect(); + } log.info("Running noop task[%s]", getId()); - log.info("Sleeping for %,d millis.", sleepTime); - Thread.sleep(sleepTime); + log.info("Sleeping for %,d millis.", runTime); + Thread.sleep(runTime); log.info("Woke up!"); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java index fb15cd75354..23767a87cb3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java @@ -639,7 +639,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer log.info("Task[%s] just disappeared!", taskId); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); } else { - log.warn("Task[%s] just disappeared but I didn't know about it?!", taskId); + log.info("Task[%s] went bye bye.", taskId); } break; } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java index c52745c5d58..da061eb9bc2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import io.druid.indexing.common.index.ChatHandler; import io.druid.indexing.common.index.ChatHandlerProvider; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.core.Response; @@ -39,6 +40,7 @@ public class ChatHandlerResource this.handlers = handlers; } + @POST @Path("/chat/{id}") public Object doTaskChat( @PathParam("id") String handlerId diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index f2a0659b9e9..dd3103a5b81 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.util.Providers; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; @@ -32,6 +33,7 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; +import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.coordinator.ForkingTaskRunner; import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.worker.Worker; @@ -76,6 +78,8 @@ public class CliMiddleManager extends ServerRunnable binder.bind(TaskRunner.class).to(ForkingTaskRunner.class); binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); + binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); + binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 446283e0dee..a617b53d4c0 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -27,6 +27,7 @@ import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; import com.google.inject.servlet.GuiceFilter; +import com.google.inject.util.Providers; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; @@ -41,6 +42,7 @@ import io.druid.guice.PolyBind; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; +import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogs; @@ -131,6 +133,8 @@ public class CliOverlord extends ServerRunnable .to(ResourceManagementSchedulerFactoryImpl.class) .in(LazySingleton.class); + binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); + configureTaskStorage(binder); configureRunners(binder); configureAutoscale(binder); @@ -162,7 +166,10 @@ public class CliOverlord extends ServerRunnable private void configureRunners(Binder binder) { PolyBind.createChoice( - binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class) + binder, + "druid.indexer.runner.type", + Key.get(TaskRunnerFactory.class), + Key.get(ForkingTaskRunnerFactory.class) ); final MapBinder biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class)); diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index b4b9ce4e661..aea4bfe881c 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -104,8 +104,10 @@ public class CliPeon extends GuiceRunnable final MapBinder handlerProviderBinder = PolyBind.optionBinder( binder, Key.get(ChatHandlerProvider.class) ); - handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); - handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); + handlerProviderBinder.addBinding("receiver") + .to(EventReceivingChatHandlerProvider.class).in(LazySingleton.class); + handlerProviderBinder.addBinding("noop") + .to(NoopChatHandlerProvider.class).in(LazySingleton.class); binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);