diff --git a/client/src/main/java/com/metamx/druid/curator/discovery/AddressPortServiceInstanceFactory.java b/client/src/main/java/com/metamx/druid/curator/discovery/AddressPortServiceInstanceFactory.java new file mode 100644 index 00000000000..3f4fa988655 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/curator/discovery/AddressPortServiceInstanceFactory.java @@ -0,0 +1,27 @@ +package com.metamx.druid.curator.discovery; + +import com.google.common.base.Throwables; +import org.apache.curator.x.discovery.ServiceInstance; + +public class AddressPortServiceInstanceFactory implements ServiceInstanceFactory +{ + private final String address; + private final int port; + + public AddressPortServiceInstanceFactory(String address, int port) + { + this.address = address; + this.port = port; + } + + @Override + public ServiceInstance create(String service) + { + try { + return ServiceInstance.builder().name(service).address(address).port(port).build(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/client/src/main/java/com/metamx/druid/curator/discovery/CuratorServiceAnnouncer.java b/client/src/main/java/com/metamx/druid/curator/discovery/CuratorServiceAnnouncer.java new file mode 100644 index 00000000000..a3156b3205d --- /dev/null +++ b/client/src/main/java/com/metamx/druid/curator/discovery/CuratorServiceAnnouncer.java @@ -0,0 +1,81 @@ +package com.metamx.druid.curator.discovery; + +import com.google.common.collect.Maps; +import com.metamx.common.logger.Logger; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceInstance; + +import java.util.Map; + +/** + * Uses the Curator Service Discovery recipe to announce services. + */ +public class CuratorServiceAnnouncer implements ServiceAnnouncer +{ + private static final Logger log = new Logger(CuratorServiceAnnouncer.class); + + private final ServiceDiscovery discovery; + private final ServiceInstanceFactory instanceFactory; + private final Map> instanceMap = Maps.newHashMap(); + private final Object monitor = new Object(); + + public CuratorServiceAnnouncer( + ServiceDiscovery discovery, + ServiceInstanceFactory instanceFactory + ) + { + this.discovery = discovery; + this.instanceFactory = instanceFactory; + } + + @Override + public void announce(String service) throws Exception + { + final ServiceInstance instance; + + synchronized (monitor) { + if (instanceMap.containsKey(service)) { + log.warn("Ignoring request to announce service[%s]", service); + return; + } else { + instance = instanceFactory.create(service); + instanceMap.put(service, instance); + } + } + + try { + log.info("Announcing service[%s]", service); + discovery.registerService(instance); + } catch (Exception e) { + log.warn("Failed to announce service[%s]", service); + synchronized (monitor) { + instanceMap.remove(service); + } + } + } + + @Override + public void unannounce(String service) throws Exception + { + final ServiceInstance instance; + + synchronized (monitor) { + instance = instanceMap.get(service); + if (instance == null) { + log.warn("Ignoring request to unannounce service[%s]", service); + return; + } + } + + log.info("Unannouncing service[%s]", service); + try { + discovery.unregisterService(instance); + } catch (Exception e) { + log.warn(e, "Failed to unannounce service[%s]", service); + } finally { + synchronized (monitor) { + instanceMap.remove(service); + } + } + } +} diff --git a/client/src/main/java/com/metamx/druid/curator/discovery/NoopServiceAnnouncer.java b/client/src/main/java/com/metamx/druid/curator/discovery/NoopServiceAnnouncer.java new file mode 100644 index 00000000000..782739f7a24 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/curator/discovery/NoopServiceAnnouncer.java @@ -0,0 +1,19 @@ +package com.metamx.druid.curator.discovery; + +/** + * Does nothing. + */ +public class NoopServiceAnnouncer implements ServiceAnnouncer +{ + @Override + public void unannounce(String service) + { + + } + + @Override + public void announce(String service) + { + + } +} diff --git a/client/src/main/java/com/metamx/druid/curator/discovery/ServiceAnnouncer.java b/client/src/main/java/com/metamx/druid/curator/discovery/ServiceAnnouncer.java new file mode 100644 index 00000000000..4e91122423a --- /dev/null +++ b/client/src/main/java/com/metamx/druid/curator/discovery/ServiceAnnouncer.java @@ -0,0 +1,11 @@ +package com.metamx.druid.curator.discovery; + +/** + * Announces our ability to serve a particular function. Multiple users may announce the same service, in which + * case they are treated as interchangeable instances of that service. + */ +public interface ServiceAnnouncer +{ + public void announce(String service) throws Exception; + public void unannounce(String service) throws Exception; +} diff --git a/client/src/main/java/com/metamx/druid/curator/discovery/ServiceInstanceFactory.java b/client/src/main/java/com/metamx/druid/curator/discovery/ServiceInstanceFactory.java new file mode 100644 index 00000000000..8f8cc19c0bd --- /dev/null +++ b/client/src/main/java/com/metamx/druid/curator/discovery/ServiceInstanceFactory.java @@ -0,0 +1,8 @@ +package com.metamx.druid.curator.discovery; + +import org.apache.curator.x.discovery.ServiceInstance; + +public interface ServiceInstanceFactory +{ + public ServiceInstance create(String service); +} diff --git a/client/src/main/java/com/metamx/druid/http/BrokerNode.java b/client/src/main/java/com/metamx/druid/http/BrokerNode.java index 6d1bb352f70..191c2ea4ad8 100644 --- a/client/src/main/java/com/metamx/druid/http/BrokerNode.java +++ b/client/src/main/java/com/metamx/druid/http/BrokerNode.java @@ -42,6 +42,8 @@ import com.metamx.druid.client.cache.MapCache; import com.metamx.druid.client.cache.MapCacheConfig; import com.metamx.druid.client.cache.MemcachedCache; import com.metamx.druid.client.cache.MemcachedCacheConfig; +import com.metamx.druid.curator.discovery.ServiceAnnouncer; +import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; @@ -225,15 +227,17 @@ public class BrokerNode extends QueryableNode { if (useDiscovery) { final Lifecycle lifecycle = getLifecycle(); - final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class); - CuratorFramework curatorFramework = Initialization.makeCuratorFramework( + final CuratorFramework curatorFramework = Initialization.makeCuratorFramework( serviceDiscoveryConfig, lifecycle ); - final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( curatorFramework, serviceDiscoveryConfig, lifecycle ); + final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer( + serviceDiscoveryConfig, serviceDiscovery + ); + Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle); } } diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java index 9a329d0bf16..1cf9af813c8 100644 --- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java +++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java @@ -28,6 +28,10 @@ import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; +import com.metamx.druid.curator.discovery.AddressPortServiceInstanceFactory; +import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; +import com.metamx.druid.curator.discovery.ServiceAnnouncer; +import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.http.EmittingRequestLogger; import com.metamx.druid.http.FileRequestLogger; import com.metamx.druid.http.RequestLogger; @@ -214,16 +218,10 @@ public class Initialization ) throws Exception { - final ServiceInstance serviceInstance = serviceInstance( - config.getServiceName(), - config.getHost(), - config.getPort() - ); final ServiceDiscovery serviceDiscovery = ServiceDiscoveryBuilder.builder(Void.class) .basePath(config.getDiscoveryPath()) .client(discoveryClient) - .thisInstance(serviceInstance) .build(); lifecycle.addHandler( @@ -251,6 +249,46 @@ public class Initialization return serviceDiscovery; } + public static ServiceAnnouncer makeServiceAnnouncer( + ServiceDiscoveryConfig config, + ServiceDiscovery serviceDiscovery + ) + { + final ServiceInstanceFactory serviceInstanceFactory = makeServiceInstanceFactory(config); + return new CuratorServiceAnnouncer(serviceDiscovery, serviceInstanceFactory); + } + + public static void announceDefaultService( + final ServiceDiscoveryConfig config, + final ServiceAnnouncer serviceAnnouncer, + final Lifecycle lifecycle + ) throws Exception + { + final String service = config.getServiceName().replace('/', ':'); + + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + serviceAnnouncer.announce(service); + } + + @Override + public void stop() + { + try { + serviceAnnouncer.unannounce(service); + } + catch (Exception e) { + log.warn(e, "Failed to unannouce default service[%s]", service); + } + } + } + ); + } + public static ServiceProvider makeServiceProvider( String serviceName, ServiceDiscovery serviceDiscovery, @@ -309,8 +347,9 @@ public class Initialization ); } - public static ServiceInstance serviceInstance(final String service, final String host, final int port) + public static ServiceInstanceFactory makeServiceInstanceFactory(ServiceDiscoveryConfig config) { + final String host = config.getHost(); final String address; final int colon = host.indexOf(':'); if (colon < 0) { @@ -319,14 +358,6 @@ public class Initialization address = host.substring(0, colon); } - try { - return ServiceInstance.builder() - .name(service.replace('/', ':')) - .address(address) - .port(port) - .build(); - } catch (Exception e) { - throw Throwables.propagate(e); - } + return new AddressPortServiceInstanceFactory(address, config.getPort()); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java index 0b4ccd96648..ce46a7b37dd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java @@ -4,10 +4,9 @@ import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; -import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig; import org.apache.curator.x.discovery.ServiceDiscovery; -import org.apache.curator.x.discovery.ServiceInstance; import java.util.concurrent.ConcurrentMap; @@ -21,92 +20,65 @@ public class EventReceiverProvider private static final Logger log = new Logger(EventReceiverProvider.class); private final EventReceiverProviderConfig config; - private final ServiceDiscovery discovery; - private final ConcurrentMap receivers; + private final ServiceAnnouncer serviceAnnouncer; + private final ConcurrentMap receivers; public EventReceiverProvider( EventReceiverProviderConfig config, - ServiceDiscovery discovery + ServiceAnnouncer serviceAnnouncer ) { this.config = config; - this.discovery = discovery; + this.serviceAnnouncer = serviceAnnouncer; this.receivers = Maps.newConcurrentMap(); } public void register(final String key, EventReceiver receiver) { - log.info("Registering event receiver: %s", key); + final String service = serviceName(key); + log.info("Registering EventReceiver: %s", key); - final EventReceiverHolder holder = new EventReceiverHolder( - receiver, - isDiscoverable() ? Initialization.serviceInstance( - String.format(config.getServiceFormat(), key), - config.getHost(), - config.getPort() - ) : null - ); - - if (receivers.putIfAbsent(key, holder) != null) { + if (receivers.putIfAbsent(key, receiver) != null) { throw new ISE("Receiver already registered for key: %s", key); } - if (isDiscoverable()) { - try { - discovery.registerService(holder.service); - } - catch (Exception e) { - log.warn(e, "Failed to register service: %s", holder.service.getName()); - receivers.remove(key, holder); - } + try { + serviceAnnouncer.announce(service); + } + catch (Exception e) { + log.warn(e, "Failed to register service: %s", service); + receivers.remove(key, receiver); } } public void unregister(final String key) { + final String service = serviceName(key); + log.info("Unregistering event receiver: %s", key); - final EventReceiverHolder holder = receivers.get(key); - if (holder == null) { + final EventReceiver receiver = receivers.get(key); + if (receiver == null) { log.warn("Receiver not currently registered, ignoring: %s", key); } - if (isDiscoverable()) { - try { - discovery.unregisterService(holder.service); - } - catch (Exception e) { - log.warn(e, "Failed to unregister service: %s", holder.service.getName()); - } + try { + serviceAnnouncer.unannounce(service); + } + catch (Exception e) { + log.warn(e, "Failed to unregister service: %s", service); } - receivers.remove(key); + receivers.remove(key, receiver); } public Optional get(final String key) { - final EventReceiverHolder holder = receivers.get(key); - if (holder != null) { - return Optional.of(holder.receiver); - } else { - return Optional.absent(); - } + return Optional.fromNullable(receivers.get(key)); } - public boolean isDiscoverable() + private String serviceName(String key) { - return discovery != null; - } - - private static class EventReceiverHolder - { - final EventReceiver receiver; - final ServiceInstance service; - - private EventReceiverHolder(EventReceiver receiver, ServiceInstance service) - { - this.receiver = receiver; - this.service = service; - } + return String.format(config.getServiceFormat(), key); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java index f2a013ab803..68ded354100 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.merger.common.actions.TaskActionClient; @@ -68,6 +69,7 @@ public class TaskMasterLifecycle final TaskRunnerFactory runnerFactory, final ResourceManagementSchedulerFactory managementSchedulerFactory, final CuratorFramework curator, + final ServiceAnnouncer serviceAnnouncer, final ServiceEmitter emitter ) { @@ -101,7 +103,7 @@ public class TaskMasterLifecycle final Lifecycle leaderLifecycle = new Lifecycle(); leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(taskRunner); - Initialization.makeServiceDiscoveryClient(curator, serviceDiscoveryConfig, leaderLifecycle); + Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle); leaderLifecycle.addManagedInstance(taskConsumer); leaderLifecycle.addManagedInstance(resourceManagementScheduler); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index f90de714778..931f8779cf4 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -46,6 +46,9 @@ import com.metamx.druid.QueryableNode; import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManagerConfig; import com.metamx.druid.config.JacksonConfigManager; +import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; +import com.metamx.druid.curator.discovery.ServiceAnnouncer; +import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.GuiceServletConfig; @@ -111,6 +114,7 @@ import com.metamx.metrics.MonitorSchedulerConfig; import com.metamx.metrics.SysMonitor; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.x.discovery.ServiceDiscovery; import org.jets3t.service.S3ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.security.AWSCredentials; @@ -152,6 +156,8 @@ public class IndexerCoordinatorNode extends QueryableNode private DataSegmentPusher segmentPusher = null; private TaskToolboxFactory taskToolboxFactory = null; private ServiceDiscovery serviceDiscovery = null; + private ServiceAnnouncer serviceAnnouncer = null; private ServiceProvider coordinatorServiceProvider = null; private Server server = null; private ExecutorServiceTaskRunner taskRunner = null; @@ -186,7 +191,6 @@ public class ExecutorNode extends BaseServerNode initializeMonitors(); initializeMergerConfig(); initializeServiceDiscovery(); - initializeCoordinatorServiceProvider(); initializeDataSegmentPusher(); initializeTaskToolbox(); initializeTaskRunner(); @@ -387,16 +391,16 @@ public class ExecutorNode extends BaseServerNode public void initializeServiceDiscovery() throws Exception { + final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class); if (serviceDiscovery == null) { - final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class); this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( getCuratorFramework(), config, lifecycle ); } - } - - public void initializeCoordinatorServiceProvider() - { + if (serviceAnnouncer == null) { + final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config); + this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory); + } if (coordinatorServiceProvider == null) { this.coordinatorServiceProvider = Initialization.makeServiceProvider( workerConfig.getMasterService(), @@ -425,9 +429,17 @@ public class ExecutorNode extends BaseServerNode public void initializeEventReceiverProvider() { if (eventReceiverProvider == null) { + final EventReceiverProviderConfig config = configFactory.build(EventReceiverProviderConfig.class); + final ServiceAnnouncer myServiceAnnouncer; + if (config.getServiceFormat() == null) { + log.info("EventReceiverProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!"); + myServiceAnnouncer = new NoopServiceAnnouncer(); + } else { + myServiceAnnouncer = serviceAnnouncer; + } this.eventReceiverProvider = new EventReceiverProvider( - configFactory.build(EventReceiverProviderConfig.class), - serviceDiscovery + config, + myServiceAnnouncer ); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 1e13070653c..10fb15755f2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -34,6 +34,9 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.QueryableNode; +import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; +import com.metamx.druid.curator.discovery.ServiceAnnouncer; +import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.initialization.Initialization; @@ -102,6 +105,7 @@ public class WorkerNode extends QueryableNode private ServiceEmitter emitter = null; private WorkerConfig workerConfig = null; private ServiceDiscovery serviceDiscovery = null; + private ServiceAnnouncer serviceAnnouncer = null; private ServiceProvider coordinatorServiceProvider = null; private WorkerCuratorCoordinator workerCuratorCoordinator = null; private WorkerTaskMonitor workerTaskMonitor = null; @@ -177,7 +181,6 @@ public class WorkerNode extends QueryableNode initializeMonitors(); initializeMergerConfig(); initializeServiceDiscovery(); - initializeCoordinatorServiceProvider(); initializeJacksonInjections(); initializeJacksonSubtypes(); initializeCuratorCoordinator(); @@ -336,10 +339,6 @@ public class WorkerNode extends QueryableNode getLifecycle() ); } - } - - public void initializeCoordinatorServiceProvider() - { if (coordinatorServiceProvider == null) { this.coordinatorServiceProvider = Initialization.makeServiceProvider( workerConfig.getMasterService(), diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 494f05c7cfc..0eb9f53a360 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -39,6 +39,7 @@ import com.metamx.druid.concurrent.Execs; import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManagerConfig; import com.metamx.druid.config.JacksonConfigManager; +import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseRuleManagerConfig; import com.metamx.druid.db.DatabaseSegmentManager; @@ -174,6 +175,10 @@ public class MasterMain serviceDiscoveryConfig, lifecycle ); + final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer( + serviceDiscoveryConfig, serviceDiscovery + ); + Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle); IndexingServiceClient indexingServiceClient = null; if (druidMasterConfig.getMergerServiceName() != null) {