mirror of https://github.com/apache/druid.git
Merger: Advertise event receivers through service discovery
This commit is contained in:
parent
341ee27419
commit
f463b95256
|
@ -277,12 +277,11 @@ public class Initialization
|
|||
)
|
||||
throws Exception
|
||||
{
|
||||
final ServiceInstance serviceInstance =
|
||||
ServiceInstance.builder()
|
||||
.name(config.getServiceName().replace('/', ':'))
|
||||
.address(addressFromHost(config.getHost()))
|
||||
.port(config.getPort())
|
||||
.build();
|
||||
final ServiceInstance serviceInstance = serviceInstance(
|
||||
config.getServiceName(),
|
||||
config.getHost(),
|
||||
config.getPort()
|
||||
);
|
||||
final ServiceDiscovery serviceDiscovery =
|
||||
ServiceDiscoveryBuilder.builder(Void.class)
|
||||
.basePath(config.getDiscoveryPath())
|
||||
|
@ -378,13 +377,24 @@ public class Initialization
|
|||
return String.format("%s/%s", basePath, PROP_SUBPATH);
|
||||
}
|
||||
|
||||
public static String addressFromHost(final String host)
|
||||
public static ServiceInstance serviceInstance(final String service, final String host, final int port)
|
||||
{
|
||||
final String address;
|
||||
final int colon = host.indexOf(':');
|
||||
if (colon < 0) {
|
||||
return host;
|
||||
address = host;
|
||||
} else {
|
||||
return host.substring(0, colon);
|
||||
address = host.substring(0, colon);
|
||||
}
|
||||
|
||||
try {
|
||||
return ServiceInstance.builder()
|
||||
.name(service.replace('/', ':'))
|
||||
.address(address)
|
||||
.port(port)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -170,7 +170,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
|
|||
closed = true;
|
||||
|
||||
if (eventReceiverProvider.isPresent()) {
|
||||
eventReceiverProvider.get().unregister(firehoseId, this);
|
||||
eventReceiverProvider.get().unregister(firehoseId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,38 +4,109 @@ import com.google.common.base.Optional;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig;
|
||||
import com.netflix.curator.x.discovery.ServiceDiscovery;
|
||||
import com.netflix.curator.x.discovery.ServiceInstance;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* Provides a link between an {@link EventReceiver} and users. The {@link #get(String)} method allows anyone with a
|
||||
* reference to this object to obtain an event receiver with a particular name. An embedded {@link ServiceDiscovery}
|
||||
* instance, if provided, will be used to advertise event receivers on this host.
|
||||
*/
|
||||
public class EventReceiverProvider
|
||||
{
|
||||
private static final Logger log = new Logger(EventReceiverProvider.class);
|
||||
|
||||
private final ConcurrentMap<String, EventReceiver> receivers;
|
||||
private final EventReceiverProviderConfig config;
|
||||
private final ServiceDiscovery<?> discovery;
|
||||
private final ConcurrentMap<String, EventReceiverHolder> receivers;
|
||||
|
||||
public EventReceiverProvider()
|
||||
public EventReceiverProvider(
|
||||
EventReceiverProviderConfig config,
|
||||
ServiceDiscovery discovery
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.discovery = discovery;
|
||||
this.receivers = Maps.newConcurrentMap();
|
||||
}
|
||||
|
||||
public void register(final String key, EventReceiver receiver)
|
||||
{
|
||||
log.info("Registering event receiver for %s", key);
|
||||
if (receivers.putIfAbsent(key, receiver) != null) {
|
||||
log.info("Registering event receiver: %s", key);
|
||||
|
||||
final EventReceiverHolder holder = new EventReceiverHolder(
|
||||
receiver,
|
||||
isDiscoverable() ? Initialization.serviceInstance(
|
||||
String.format(config.getServiceFormat(), key),
|
||||
config.getHost(),
|
||||
config.getPort()
|
||||
) : null
|
||||
);
|
||||
|
||||
if (receivers.putIfAbsent(key, holder) != null) {
|
||||
throw new ISE("Receiver already registered for key: %s", key);
|
||||
}
|
||||
|
||||
if (isDiscoverable()) {
|
||||
try {
|
||||
discovery.registerService(holder.service);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to register service: %s", holder.service.getName());
|
||||
receivers.remove(key, holder);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void unregister(final String key, EventReceiver receiver)
|
||||
public void unregister(final String key)
|
||||
{
|
||||
log.info("Unregistering event receiver for %s", key);
|
||||
if (!receivers.remove(key, receiver)) {
|
||||
log.info("Unregistering event receiver: %s", key);
|
||||
|
||||
final EventReceiverHolder holder = receivers.get(key);
|
||||
if (holder == null) {
|
||||
log.warn("Receiver not currently registered, ignoring: %s", key);
|
||||
}
|
||||
|
||||
if (isDiscoverable()) {
|
||||
try {
|
||||
discovery.unregisterService(holder.service);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to unregister service: %s", holder.service.getName());
|
||||
}
|
||||
}
|
||||
|
||||
receivers.remove(key);
|
||||
}
|
||||
|
||||
public Optional<EventReceiver> get(final String key)
|
||||
{
|
||||
return Optional.fromNullable(receivers.get(key));
|
||||
final EventReceiverHolder holder = receivers.get(key);
|
||||
if (holder != null) {
|
||||
return Optional.of(holder.receiver);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isDiscoverable()
|
||||
{
|
||||
return discovery != null;
|
||||
}
|
||||
|
||||
private static class EventReceiverHolder
|
||||
{
|
||||
final EventReceiver receiver;
|
||||
final ServiceInstance service;
|
||||
|
||||
private EventReceiverHolder(EventReceiver receiver, ServiceInstance service)
|
||||
{
|
||||
this.receiver = receiver;
|
||||
this.service = service;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
package com.metamx.druid.merger.worker.config;
|
||||
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.DefaultNull;
|
||||
|
||||
public abstract class EventReceiverProviderConfig
|
||||
{
|
||||
@Config("druid.indexer.eventreceiver.service")
|
||||
@DefaultNull
|
||||
public abstract String getServiceFormat();
|
||||
|
||||
@Config("druid.host")
|
||||
public abstract String getHost();
|
||||
|
||||
@Config("druid.port")
|
||||
public abstract int getPort();
|
||||
}
|
|
@ -61,6 +61,7 @@ import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
|
|||
import com.metamx.druid.merger.common.index.EventReceiverProvider;
|
||||
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
|
||||
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
|
||||
import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig;
|
||||
import com.metamx.druid.merger.worker.config.WorkerConfig;
|
||||
import com.metamx.druid.realtime.SegmentAnnouncer;
|
||||
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
|
||||
|
@ -485,7 +486,10 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
|
|||
public void initializeEventReceiverProvider()
|
||||
{
|
||||
if (eventReceiverProvider == null) {
|
||||
this.eventReceiverProvider = new EventReceiverProvider();
|
||||
this.eventReceiverProvider = new EventReceiverProvider(
|
||||
configFactory.build(EventReceiverProviderConfig.class),
|
||||
serviceDiscovery
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue