From f463b9525633491c5dfa12bfd337a0f9ad51fce8 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 25 Apr 2013 13:57:23 +0300 Subject: [PATCH] Merger: Advertise event receivers through service discovery --- .../druid/initialization/Initialization.java | 28 ++++-- .../index/EventReceiverFirehoseFactory.java | 2 +- .../common/index/EventReceiverProvider.java | 87 +++++++++++++++++-- .../config/EventReceiverProviderConfig.java | 17 ++++ .../merger/worker/executor/ExecutorNode.java | 6 +- 5 files changed, 121 insertions(+), 19 deletions(-) create mode 100644 merger/src/main/java/com/metamx/druid/merger/worker/config/EventReceiverProviderConfig.java diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java index 314cc615095..96b88350c70 100644 --- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java +++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java @@ -277,12 +277,11 @@ public class Initialization ) throws Exception { - final ServiceInstance serviceInstance = - ServiceInstance.builder() - .name(config.getServiceName().replace('/', ':')) - .address(addressFromHost(config.getHost())) - .port(config.getPort()) - .build(); + final ServiceInstance serviceInstance = serviceInstance( + config.getServiceName(), + config.getHost(), + config.getPort() + ); final ServiceDiscovery serviceDiscovery = ServiceDiscoveryBuilder.builder(Void.class) .basePath(config.getDiscoveryPath()) @@ -378,13 +377,24 @@ public class Initialization return String.format("%s/%s", basePath, PROP_SUBPATH); } - public static String addressFromHost(final String host) + public static ServiceInstance serviceInstance(final String service, final String host, final int port) { + final String address; final int colon = host.indexOf(':'); if (colon < 0) { - return host; + address = host; } else { - return host.substring(0, colon); + address = host.substring(0, colon); + } + + try { + return ServiceInstance.builder() + .name(service.replace('/', ':')) + .address(address) + .port(port) + .build(); + } catch (Exception e) { + throw Throwables.propagate(e); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java index d1e7b0bd3b9..bc7c65e9897 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java @@ -170,7 +170,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory closed = true; if (eventReceiverProvider.isPresent()) { - eventReceiverProvider.get().unregister(firehoseId, this); + eventReceiverProvider.get().unregister(firehoseId); } } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java index 2e8cd705528..03ecbe10836 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverProvider.java @@ -4,38 +4,109 @@ import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig; +import com.netflix.curator.x.discovery.ServiceDiscovery; +import com.netflix.curator.x.discovery.ServiceInstance; import java.util.concurrent.ConcurrentMap; +/** + * Provides a link between an {@link EventReceiver} and users. The {@link #get(String)} method allows anyone with a + * reference to this object to obtain an event receiver with a particular name. An embedded {@link ServiceDiscovery} + * instance, if provided, will be used to advertise event receivers on this host. + */ public class EventReceiverProvider { private static final Logger log = new Logger(EventReceiverProvider.class); - private final ConcurrentMap receivers; + private final EventReceiverProviderConfig config; + private final ServiceDiscovery discovery; + private final ConcurrentMap receivers; - public EventReceiverProvider() + public EventReceiverProvider( + EventReceiverProviderConfig config, + ServiceDiscovery discovery + ) { + this.config = config; + this.discovery = discovery; this.receivers = Maps.newConcurrentMap(); } public void register(final String key, EventReceiver receiver) { - log.info("Registering event receiver for %s", key); - if (receivers.putIfAbsent(key, receiver) != null) { + log.info("Registering event receiver: %s", key); + + final EventReceiverHolder holder = new EventReceiverHolder( + receiver, + isDiscoverable() ? Initialization.serviceInstance( + String.format(config.getServiceFormat(), key), + config.getHost(), + config.getPort() + ) : null + ); + + if (receivers.putIfAbsent(key, holder) != null) { throw new ISE("Receiver already registered for key: %s", key); } + + if (isDiscoverable()) { + try { + discovery.registerService(holder.service); + } + catch (Exception e) { + log.warn(e, "Failed to register service: %s", holder.service.getName()); + receivers.remove(key, holder); + } + } } - public void unregister(final String key, EventReceiver receiver) + public void unregister(final String key) { - log.info("Unregistering event receiver for %s", key); - if (!receivers.remove(key, receiver)) { + log.info("Unregistering event receiver: %s", key); + + final EventReceiverHolder holder = receivers.get(key); + if (holder == null) { log.warn("Receiver not currently registered, ignoring: %s", key); } + + if (isDiscoverable()) { + try { + discovery.unregisterService(holder.service); + } + catch (Exception e) { + log.warn(e, "Failed to unregister service: %s", holder.service.getName()); + } + } + + receivers.remove(key); } public Optional get(final String key) { - return Optional.fromNullable(receivers.get(key)); + final EventReceiverHolder holder = receivers.get(key); + if (holder != null) { + return Optional.of(holder.receiver); + } else { + return Optional.absent(); + } + } + + public boolean isDiscoverable() + { + return discovery != null; + } + + private static class EventReceiverHolder + { + final EventReceiver receiver; + final ServiceInstance service; + + private EventReceiverHolder(EventReceiver receiver, ServiceInstance service) + { + this.receiver = receiver; + this.service = service; + } } } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/config/EventReceiverProviderConfig.java b/merger/src/main/java/com/metamx/druid/merger/worker/config/EventReceiverProviderConfig.java new file mode 100644 index 00000000000..4c1ffdb960d --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/worker/config/EventReceiverProviderConfig.java @@ -0,0 +1,17 @@ +package com.metamx.druid.merger.worker.config; + +import org.skife.config.Config; +import org.skife.config.DefaultNull; + +public abstract class EventReceiverProviderConfig +{ + @Config("druid.indexer.eventreceiver.service") + @DefaultNull + public abstract String getServiceFormat(); + + @Config("druid.host") + public abstract String getHost(); + + @Config("druid.port") + public abstract int getPort(); +} diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java index f00f9b7bf34..cf1d576c179 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java @@ -61,6 +61,7 @@ import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.merger.common.index.EventReceiverProvider; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner; +import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig; import com.metamx.druid.merger.worker.config.WorkerConfig; import com.metamx.druid.realtime.SegmentAnnouncer; import com.metamx.druid.realtime.ZkSegmentAnnouncer; @@ -485,7 +486,10 @@ public class ExecutorNode extends BaseServerNode public void initializeEventReceiverProvider() { if (eventReceiverProvider == null) { - this.eventReceiverProvider = new EventReceiverProvider(); + this.eventReceiverProvider = new EventReceiverProvider( + configFactory.build(EventReceiverProviderConfig.class), + serviceDiscovery + ); } }