diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandlerProvider.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandlerProvider.java index c89b079a30c..1662abc7d9b 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandlerProvider.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandlerProvider.java @@ -20,11 +20,6 @@ package com.metamx.druid.indexing.common.index; import com.google.common.base.Optional; -import com.google.common.collect.Maps; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; -import com.metamx.druid.curator.discovery.ServiceAnnouncer; -import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; /** */ @@ -34,62 +29,5 @@ public interface ChatHandlerProvider public void unregister(final String key); - public ChatHandlerProvider( - ChatHandlerProviderConfig config, - ServiceAnnouncer serviceAnnouncer - ) - { - this.config = config; - this.serviceAnnouncer = serviceAnnouncer; - this.handlers = Maps.newConcurrentMap(); - } - - public void register(final String key, ChatHandler handler) - { - final String service = serviceName(key); - log.info("Registering Eventhandler: %s", key); - - if (handlers.putIfAbsent(key, handler) != null) { - throw new ISE("handler already registered for key: %s", key); - } - - try { - serviceAnnouncer.announce(service); - } - catch (Exception e) { - log.warn(e, "Failed to register service: %s", service); - handlers.remove(key, handler); - } - } - - public void unregister(final String key) - { - final String service = serviceName(key); - - log.info("Unregistering chat handler: %s", key); - - final ChatHandler handler = handlers.get(key); - if (handler == null) { - log.warn("handler not currently registered, ignoring: %s", key); - } - - try { - serviceAnnouncer.unannounce(service); - } - catch (Exception e) { - log.warn(e, "Failed to unregister service: %s", service); - } - - handlers.remove(key, handler); - } - - public Optional get(final String key) - { - return Optional.fromNullable(handlers.get(key)); - } - - private String serviceName(String key) - { - return String.format(config.getServiceFormat(), key); - } + public Optional get(final String key); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java index f18a79c241c..c4ae8cec851 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java @@ -25,6 +25,7 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; +import com.metamx.druid.initialization.DruidNode; import java.util.concurrent.ConcurrentMap; @@ -52,44 +53,43 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider } @Override - public void register(final String key, ChatHandler handler) + public void register(final String service, ChatHandler handler) { - final String service = serviceName(key); - log.info("Registering Eventhandler: %s", key); + final DruidNode node = makeDruidNode(service); + log.info("Registering Eventhandler[%s]", service); - if (handlers.putIfAbsent(key, handler) != null) { - throw new ISE("handler already registered for key: %s", key); + if (handlers.putIfAbsent(service, handler) != null) { + throw new ISE("handler already registered for service[%s]", service); } try { - serviceAnnouncer.announce(service); + serviceAnnouncer.announce(node); } catch (Exception e) { - log.warn(e, "Failed to register service: %s", service); - handlers.remove(key, handler); + log.warn(e, "Failed to register service[%s]", service); + handlers.remove(service, handler); } } @Override - public void unregister(final String key) + public void unregister(final String service) { - final String service = serviceName(key); - log.info("Unregistering chat handler: %s", key); + log.info("Unregistering chat handler[%s]", service); - final ChatHandler handler = handlers.get(key); + final ChatHandler handler = handlers.get(service); if (handler == null) { - log.warn("handler not currently registered, ignoring: %s", key); + log.warn("handler[%s] not currently registered, ignoring.", service); } try { - serviceAnnouncer.unannounce(service); + serviceAnnouncer.unannounce(makeDruidNode(service)); } catch (Exception e) { - log.warn(e, "Failed to unregister service: %s", service); + log.warn(e, "Failed to unregister service[%s]", service); } - handlers.remove(key, handler); + handlers.remove(service, handler); } @Override @@ -98,8 +98,8 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider return Optional.fromNullable(handlers.get(key)); } - private String serviceName(String key) + private DruidNode makeDruidNode(String key) { - return String.format(config.getServiceFormat(), key); + return new DruidNode(key, config.getHost(), config.getPort()); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java index 8388c9653e3..7f43a31a746 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java @@ -31,13 +31,13 @@ import com.google.common.collect.Sets; import com.google.common.collect.TreeMultiset; import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; -import com.metamx.druid.input.InputRow; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskToolbox; import com.metamx.druid.indexing.common.actions.SpawnTasksAction; +import com.metamx.druid.input.InputRow; +import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; -import com.metamx.druid.realtime.Schema; import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.shard.SingleDimensionShardSpec; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java index 4722222bf52..d98868f989d 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java @@ -30,13 +30,13 @@ import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; -import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskToolbox; import com.metamx.druid.indexing.common.actions.SegmentInsertAction; import com.metamx.druid.indexing.common.actions.SegmentListUsedAction; import com.metamx.druid.indexing.common.actions.SpawnTasksAction; import com.metamx.druid.indexing.common.actions.TaskActionClient; +import com.metamx.druid.loading.SegmentLoadingException; import org.joda.time.DateTime; import org.joda.time.Interval; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java index 4ef331e2de5..d1823d36cda 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java @@ -26,8 +26,6 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.curator.discovery.ServiceAnnouncer; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.indexing.common.actions.TaskActionClient; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; import com.metamx.druid.indexing.common.task.Task; @@ -35,6 +33,8 @@ import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.indexing.coordinator.exec.TaskConsumer; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; +import com.metamx.druid.initialization.DruidNode; +import com.metamx.druid.initialization.Initialization; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import org.apache.curator.framework.CuratorFramework; @@ -66,7 +66,7 @@ public class TaskMasterLifecycle final TaskQueue taskQueue, final TaskActionClientFactory taskActionClientFactory, final IndexerCoordinatorConfig indexerCoordinatorConfig, - final DruidNode nodeConfig, + final DruidNode node, final TaskRunnerFactory runnerFactory, final ResourceManagementSchedulerFactory managementSchedulerFactory, final CuratorFramework curator, @@ -119,7 +119,7 @@ public class TaskMasterLifecycle } ); leaderLifecycle.addManagedInstance(taskQueue); - Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle); + Initialization.announceDefaultService(node, serviceAnnouncer, leaderLifecycle); leaderLifecycle.addManagedInstance(taskConsumer); if ("remote".equalsIgnoreCase(indexerCoordinatorConfig.getRunnerImpl())) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java index ac2d680ebc0..202ca0f55f5 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java @@ -40,7 +40,6 @@ import com.metamx.druid.query.segment.QuerySegmentWalker; import com.metamx.druid.query.segment.SegmentDescriptor; import com.metamx.emitter.EmittingLogger; import org.apache.commons.io.FileUtils; -import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.File; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/config/WorkerConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/config/WorkerConfig.java index deca87c56f5..2474704a418 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/config/WorkerConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/config/WorkerConfig.java @@ -20,7 +20,6 @@ package com.metamx.druid.indexing.worker.config; import org.skife.config.Config; -import org.skife.config.Default; /** */ diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index 4b89b40e949..351d617fdcf 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -388,14 +388,12 @@ public class ExecutorNode extends BaseServerNode { if (chatHandlerProvider == null) { final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class); - final ServiceAnnouncer myServiceAnnouncer; if (config.isPublishDiscovery()) { - myServiceAnnouncer = serviceAnnouncer; + this.chatHandlerProvider = new EventReceivingChatHandlerProvider(config, serviceAnnouncer); } else { log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!"); - myServiceAnnouncer = new NoopServiceAnnouncer(); + this.chatHandlerProvider = new NoopChatHandlerProvider(); } - this.chatHandlerProvider = new ChatHandlerProvider(config, myServiceAnnouncer); } } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index 6c6da7ccb89..0c39406b821 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -45,7 +45,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import javax.annotation.Nullable; import java.io.File; import java.util.Arrays; import java.util.Set; diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java index 6e8171f2072..77e7c2fcb49 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java @@ -36,13 +36,6 @@ import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.indexer.granularity.UniformGranularitySpec; -import com.metamx.druid.indexing.common.task.TaskResource; -import com.metamx.druid.input.InputRow; -import com.metamx.druid.input.MapBasedInputRow; -import com.metamx.druid.jackson.DefaultObjectMapper; -import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.loading.DataSegmentKiller; -import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.indexing.common.TaskLock; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskToolbox; @@ -57,6 +50,7 @@ import com.metamx.druid.indexing.common.task.AbstractTask; import com.metamx.druid.indexing.common.task.IndexTask; import com.metamx.druid.indexing.common.task.KillTask; import com.metamx.druid.indexing.common.task.Task; +import com.metamx.druid.indexing.common.task.TaskResource; import com.metamx.druid.indexing.coordinator.exec.TaskConsumer; import com.metamx.druid.input.InputRow; import com.metamx.druid.input.MapBasedInputRow; diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java index 8f512390f14..27442ed1cdc 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -28,11 +28,11 @@ import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.google.common.collect.Lists; -import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.indexing.coordinator.setup.EC2NodeData; import com.metamx.druid.indexing.coordinator.setup.GalaxyUserData; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; +import com.metamx.druid.jackson.DefaultObjectMapper; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java index f73d289526d..fcbc1d4113b 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -22,7 +22,6 @@ package com.metamx.druid.indexing.coordinator.scaling; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.indexing.TestTask; @@ -45,10 +44,8 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicReference; /**