fix chat handler resources not correctly registering themselves

This commit is contained in:
fjy 2013-10-01 11:25:39 -07:00
parent d6445cd8f3
commit 5d0d71250b
10 changed files with 82 additions and 22 deletions

View File

@ -25,6 +25,8 @@ import com.google.common.base.Optional;
*/ */
public interface ChatHandlerProvider public interface ChatHandlerProvider
{ {
public String getType();
public void register(final String key, ChatHandler handler); public void register(final String key, ChatHandler handler);
public void unregister(final String key); public void unregister(final String key);

View File

@ -57,20 +57,20 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class); private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
private static final int DEFAULT_BUFFER_SIZE = 100000; private static final int DEFAULT_BUFFER_SIZE = 100000;
private final String firehoseId; private final String serviceName;
private final int bufferSize; private final int bufferSize;
private final MapInputRowParser parser; private final MapInputRowParser parser;
private final Optional<EventReceivingChatHandlerProvider> chatHandlerProvider; private final Optional<ChatHandlerProvider> chatHandlerProvider;
@JsonCreator @JsonCreator
public EventReceiverFirehoseFactory( public EventReceiverFirehoseFactory(
@JsonProperty("firehoseId") String firehoseId, @JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize, @JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("parser") MapInputRowParser parser, @JsonProperty("parser") MapInputRowParser parser,
@JacksonInject("chatHandlerProvider") EventReceivingChatHandlerProvider chatHandlerProvider @JacksonInject ChatHandlerProvider chatHandlerProvider
) )
{ {
this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId"); this.serviceName = Preconditions.checkNotNull(serviceName, "serviceName");
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize; this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.parser = Preconditions.checkNotNull(parser, "parser"); this.parser = Preconditions.checkNotNull(parser, "parser");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
@ -79,21 +79,24 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
@Override @Override
public Firehose connect() throws IOException public Firehose connect() throws IOException
{ {
log.info("Connecting firehose: %s", firehoseId); log.info("Connecting firehose: %s", serviceName);
final EventReceiverFirehose firehose = new EventReceiverFirehose(); final EventReceiverFirehose firehose = new EventReceiverFirehose();
if (chatHandlerProvider.isPresent()) { if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().register(firehoseId, firehose); log.info("Found chathandler with type[%s]", chatHandlerProvider.get().getType());
chatHandlerProvider.get().register(serviceName, firehose);
} else {
log.info("No chathandler detected");
} }
return firehose; return firehose;
} }
@JsonProperty @JsonProperty
public String getFirehoseId() public String getServiceName()
{ {
return firehoseId; return serviceName;
} }
@JsonProperty @JsonProperty
@ -111,7 +114,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
public class EventReceiverFirehose implements ChatHandler, Firehose public class EventReceiverFirehose implements ChatHandler, Firehose
{ {
private final BlockingQueue<InputRow> buffer; private final BlockingQueue<InputRow> buffer;
private final Object readLock = new Object(); private final Object readLock = new Object();
private volatile InputRow nextRow = null; private volatile InputRow nextRow = null;
private volatile boolean closed = false; private volatile boolean closed = false;
@ -125,7 +130,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
@Produces("application/json") @Produces("application/json")
public Response addAll(Collection<Map<String, Object>> events) public Response addAll(Collection<Map<String, Object>> events)
{ {
log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId); log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
final List<InputRow> rows = Lists.newArrayList(); final List<InputRow> rows = Lists.newArrayList();
for (final Map<String, Object> event : events) { for (final Map<String, Object> event : events) {
@ -146,7 +151,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
} }
return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build(); return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build();
} catch (InterruptedException e) { }
catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
@ -167,7 +173,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
} }
return nextRow != null; return nextRow != null;
} }
} }
@Override @Override
@ -205,7 +211,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
closed = true; closed = true;
if (chatHandlerProvider.isPresent()) { if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(firehoseId); chatHandlerProvider.get().unregister(serviceName);
} }
} }
} }

View File

@ -54,6 +54,12 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider
this.handlers = Maps.newConcurrentMap(); this.handlers = Maps.newConcurrentMap();
} }
@Override
public String getType()
{
return "eventReceiving";
}
@Override @Override
public void register(final String service, ChatHandler handler) public void register(final String service, ChatHandler handler)
{ {
@ -76,7 +82,6 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider
@Override @Override
public void unregister(final String service) public void unregister(final String service)
{ {
log.info("Unregistering chat handler[%s]", service); log.info("Unregistering chat handler[%s]", service);
final ChatHandler handler = handlers.get(service); final ChatHandler handler = handlers.get(service);

View File

@ -25,6 +25,12 @@ import com.google.common.base.Optional;
*/ */
public class NoopChatHandlerProvider implements ChatHandlerProvider public class NoopChatHandlerProvider implements ChatHandlerProvider
{ {
@Override
public String getType()
{
return "noop";
}
@Override @Override
public void register(String key, ChatHandler handler) public void register(String key, ChatHandler handler)
{ {

View File

@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -33,11 +34,17 @@ import org.joda.time.Period;
public class NoopTask extends AbstractTask public class NoopTask extends AbstractTask
{ {
private static final Logger log = new Logger(NoopTask.class); private static final Logger log = new Logger(NoopTask.class);
private static int defaultRunTime = 2500;
private final int runTime;
private final FirehoseFactory firehoseFactory;
@JsonCreator @JsonCreator
public NoopTask( public NoopTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("interval") Interval interval @JsonProperty("interval") Interval interval,
@JsonProperty("runTime") int runTime,
@JsonProperty("firehose") FirehoseFactory firehoseFactory
) )
{ {
super( super(
@ -45,6 +52,10 @@ public class NoopTask extends AbstractTask
"none", "none",
interval == null ? new Interval(Period.days(1), new DateTime()) : interval interval == null ? new Interval(Period.days(1), new DateTime()) : interval
); );
this.runTime = (runTime == 0) ? defaultRunTime : runTime;
this.firehoseFactory = firehoseFactory;
} }
@Override @Override
@ -53,14 +64,29 @@ public class NoopTask extends AbstractTask
return "noop"; return "noop";
} }
@JsonProperty("runTime")
public int getRunTime()
{
return runTime;
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
final int sleepTime = 2500; if (firehoseFactory != null) {
log.info("Connecting firehose");
firehoseFactory.connect();
}
log.info("Running noop task[%s]", getId()); log.info("Running noop task[%s]", getId());
log.info("Sleeping for %,d millis.", sleepTime); log.info("Sleeping for %,d millis.", runTime);
Thread.sleep(sleepTime); Thread.sleep(runTime);
log.info("Woke up!"); log.info("Woke up!");
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -639,7 +639,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
log.info("Task[%s] just disappeared!", taskId); log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else { } else {
log.warn("Task[%s] just disappeared but I didn't know about it?!", taskId); log.info("Task[%s] went bye bye.", taskId);
} }
break; break;
} }

View File

@ -24,6 +24,7 @@ import com.google.inject.Inject;
import io.druid.indexing.common.index.ChatHandler; import io.druid.indexing.common.index.ChatHandler;
import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.index.ChatHandlerProvider;
import javax.ws.rs.POST;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
@ -39,6 +40,7 @@ public class ChatHandlerResource
this.handlers = handlers; this.handlers = handlers;
} }
@POST
@Path("/chat/{id}") @Path("/chat/{id}")
public Object doTaskChat( public Object doTaskChat(
@PathParam("id") String handlerId @PathParam("id") String handlerId

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceModuleHelper;
@ -32,6 +33,7 @@ import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule; import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Self;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.coordinator.ForkingTaskRunner; import io.druid.indexing.coordinator.ForkingTaskRunner;
import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.Worker;
@ -76,6 +78,8 @@ public class CliMiddleManager extends ServerRunnable
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class); binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));
binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);

View File

@ -27,6 +27,7 @@ import com.google.inject.Module;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import com.google.inject.servlet.GuiceFilter; import com.google.inject.servlet.GuiceFilter;
import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceModuleHelper;
@ -41,6 +42,7 @@ import io.druid.guice.PolyBind;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogs; import io.druid.indexing.common.tasklogs.TaskLogs;
@ -131,6 +133,8 @@ public class CliOverlord extends ServerRunnable
.to(ResourceManagementSchedulerFactoryImpl.class) .to(ResourceManagementSchedulerFactoryImpl.class)
.in(LazySingleton.class); .in(LazySingleton.class);
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));
configureTaskStorage(binder); configureTaskStorage(binder);
configureRunners(binder); configureRunners(binder);
configureAutoscale(binder); configureAutoscale(binder);
@ -162,7 +166,10 @@ public class CliOverlord extends ServerRunnable
private void configureRunners(Binder binder) private void configureRunners(Binder binder)
{ {
PolyBind.createChoice( PolyBind.createChoice(
binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class) binder,
"druid.indexer.runner.type",
Key.get(TaskRunnerFactory.class),
Key.get(ForkingTaskRunnerFactory.class)
); );
final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class)); final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class));

View File

@ -104,8 +104,10 @@ public class CliPeon extends GuiceRunnable
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder( final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(ChatHandlerProvider.class) binder, Key.get(ChatHandlerProvider.class)
); );
handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); handlerProviderBinder.addBinding("receiver")
handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); .to(EventReceivingChatHandlerProvider.class).in(LazySingleton.class);
handlerProviderBinder.addBinding("noop")
.to(NoopChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);