ServiceAnnouncer:

- Interface with two impls, Noop and Curator
- Used wherever fine services are advertised

Initialization:
- Don't automatically register with service discovery
- Add makeServiceAnnouncer, announceDefaultService methods
- Replace serviceInstance with makeServiceInstanceFactory
This commit is contained in:
Gian Merlino 2013-05-03 14:29:17 +03:00
parent 8ce55ac632
commit af08ea7617
13 changed files with 281 additions and 88 deletions

View File

@ -0,0 +1,27 @@
package com.metamx.druid.curator.discovery;
import com.google.common.base.Throwables;
import org.apache.curator.x.discovery.ServiceInstance;
public class AddressPortServiceInstanceFactory implements ServiceInstanceFactory<Void>
{
private final String address;
private final int port;
public AddressPortServiceInstanceFactory(String address, int port)
{
this.address = address;
this.port = port;
}
@Override
public ServiceInstance<Void> create(String service)
{
try {
return ServiceInstance.<Void>builder().name(service).address(address).port(port).build();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -0,0 +1,81 @@
package com.metamx.druid.curator.discovery;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import java.util.Map;
/**
* Uses the Curator Service Discovery recipe to announce services.
*/
public class CuratorServiceAnnouncer<T> implements ServiceAnnouncer
{
private static final Logger log = new Logger(CuratorServiceAnnouncer.class);
private final ServiceDiscovery<T> discovery;
private final ServiceInstanceFactory<T> instanceFactory;
private final Map<String, ServiceInstance<T>> instanceMap = Maps.newHashMap();
private final Object monitor = new Object();
public CuratorServiceAnnouncer(
ServiceDiscovery<T> discovery,
ServiceInstanceFactory<T> instanceFactory
)
{
this.discovery = discovery;
this.instanceFactory = instanceFactory;
}
@Override
public void announce(String service) throws Exception
{
final ServiceInstance<T> instance;
synchronized (monitor) {
if (instanceMap.containsKey(service)) {
log.warn("Ignoring request to announce service[%s]", service);
return;
} else {
instance = instanceFactory.create(service);
instanceMap.put(service, instance);
}
}
try {
log.info("Announcing service[%s]", service);
discovery.registerService(instance);
} catch (Exception e) {
log.warn("Failed to announce service[%s]", service);
synchronized (monitor) {
instanceMap.remove(service);
}
}
}
@Override
public void unannounce(String service) throws Exception
{
final ServiceInstance<T> instance;
synchronized (monitor) {
instance = instanceMap.get(service);
if (instance == null) {
log.warn("Ignoring request to unannounce service[%s]", service);
return;
}
}
log.info("Unannouncing service[%s]", service);
try {
discovery.unregisterService(instance);
} catch (Exception e) {
log.warn(e, "Failed to unannounce service[%s]", service);
} finally {
synchronized (monitor) {
instanceMap.remove(service);
}
}
}
}

View File

@ -0,0 +1,19 @@
package com.metamx.druid.curator.discovery;
/**
* Does nothing.
*/
public class NoopServiceAnnouncer implements ServiceAnnouncer
{
@Override
public void unannounce(String service)
{
}
@Override
public void announce(String service)
{
}
}

View File

@ -0,0 +1,11 @@
package com.metamx.druid.curator.discovery;
/**
* Announces our ability to serve a particular function. Multiple users may announce the same service, in which
* case they are treated as interchangeable instances of that service.
*/
public interface ServiceAnnouncer
{
public void announce(String service) throws Exception;
public void unannounce(String service) throws Exception;
}

View File

@ -0,0 +1,8 @@
package com.metamx.druid.curator.discovery;
import org.apache.curator.x.discovery.ServiceInstance;
public interface ServiceInstanceFactory<T>
{
public ServiceInstance<T> create(String service);
}

View File

@ -42,6 +42,8 @@ import com.metamx.druid.client.cache.MapCache;
import com.metamx.druid.client.cache.MapCacheConfig;
import com.metamx.druid.client.cache.MemcachedCache;
import com.metamx.druid.client.cache.MemcachedCacheConfig;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
@ -225,15 +227,17 @@ public class BrokerNode extends QueryableNode<BrokerNode>
{
if (useDiscovery) {
final Lifecycle lifecycle = getLifecycle();
final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
final CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
serviceDiscoveryConfig, lifecycle
);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, serviceDiscoveryConfig, lifecycle
);
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(
serviceDiscoveryConfig, serviceDiscovery
);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle);
}
}

View File

@ -28,6 +28,10 @@ import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.discovery.AddressPortServiceInstanceFactory;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.EmittingRequestLogger;
import com.metamx.druid.http.FileRequestLogger;
import com.metamx.druid.http.RequestLogger;
@ -214,16 +218,10 @@ public class Initialization
)
throws Exception
{
final ServiceInstance serviceInstance = serviceInstance(
config.getServiceName(),
config.getHost(),
config.getPort()
);
final ServiceDiscovery serviceDiscovery =
ServiceDiscoveryBuilder.builder(Void.class)
.basePath(config.getDiscoveryPath())
.client(discoveryClient)
.thisInstance(serviceInstance)
.build();
lifecycle.addHandler(
@ -251,6 +249,46 @@ public class Initialization
return serviceDiscovery;
}
public static ServiceAnnouncer makeServiceAnnouncer(
ServiceDiscoveryConfig config,
ServiceDiscovery serviceDiscovery
)
{
final ServiceInstanceFactory serviceInstanceFactory = makeServiceInstanceFactory(config);
return new CuratorServiceAnnouncer(serviceDiscovery, serviceInstanceFactory);
}
public static void announceDefaultService(
final ServiceDiscoveryConfig config,
final ServiceAnnouncer serviceAnnouncer,
final Lifecycle lifecycle
) throws Exception
{
final String service = config.getServiceName().replace('/', ':');
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceAnnouncer.announce(service);
}
@Override
public void stop()
{
try {
serviceAnnouncer.unannounce(service);
}
catch (Exception e) {
log.warn(e, "Failed to unannouce default service[%s]", service);
}
}
}
);
}
public static ServiceProvider makeServiceProvider(
String serviceName,
ServiceDiscovery serviceDiscovery,
@ -309,8 +347,9 @@ public class Initialization
);
}
public static ServiceInstance serviceInstance(final String service, final String host, final int port)
public static ServiceInstanceFactory<Void> makeServiceInstanceFactory(ServiceDiscoveryConfig config)
{
final String host = config.getHost();
final String address;
final int colon = host.indexOf(':');
if (colon < 0) {
@ -319,14 +358,6 @@ public class Initialization
address = host.substring(0, colon);
}
try {
return ServiceInstance.builder()
.name(service.replace('/', ':'))
.address(address)
.port(port)
.build();
} catch (Exception e) {
throw Throwables.propagate(e);
}
return new AddressPortServiceInstanceFactory(address, config.getPort());
}
}

View File

@ -4,10 +4,9 @@ import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.merger.worker.config.EventReceiverProviderConfig;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import java.util.concurrent.ConcurrentMap;
@ -21,92 +20,65 @@ public class EventReceiverProvider
private static final Logger log = new Logger(EventReceiverProvider.class);
private final EventReceiverProviderConfig config;
private final ServiceDiscovery<?> discovery;
private final ConcurrentMap<String, EventReceiverHolder> receivers;
private final ServiceAnnouncer serviceAnnouncer;
private final ConcurrentMap<String, EventReceiver> receivers;
public EventReceiverProvider(
EventReceiverProviderConfig config,
ServiceDiscovery discovery
ServiceAnnouncer serviceAnnouncer
)
{
this.config = config;
this.discovery = discovery;
this.serviceAnnouncer = serviceAnnouncer;
this.receivers = Maps.newConcurrentMap();
}
public void register(final String key, EventReceiver receiver)
{
log.info("Registering event receiver: %s", key);
final String service = serviceName(key);
log.info("Registering EventReceiver: %s", key);
final EventReceiverHolder holder = new EventReceiverHolder(
receiver,
isDiscoverable() ? Initialization.serviceInstance(
String.format(config.getServiceFormat(), key),
config.getHost(),
config.getPort()
) : null
);
if (receivers.putIfAbsent(key, holder) != null) {
if (receivers.putIfAbsent(key, receiver) != null) {
throw new ISE("Receiver already registered for key: %s", key);
}
if (isDiscoverable()) {
try {
discovery.registerService(holder.service);
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", holder.service.getName());
receivers.remove(key, holder);
}
try {
serviceAnnouncer.announce(service);
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", service);
receivers.remove(key, receiver);
}
}
public void unregister(final String key)
{
final String service = serviceName(key);
log.info("Unregistering event receiver: %s", key);
final EventReceiverHolder holder = receivers.get(key);
if (holder == null) {
final EventReceiver receiver = receivers.get(key);
if (receiver == null) {
log.warn("Receiver not currently registered, ignoring: %s", key);
}
if (isDiscoverable()) {
try {
discovery.unregisterService(holder.service);
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", holder.service.getName());
}
try {
serviceAnnouncer.unannounce(service);
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", service);
}
receivers.remove(key);
receivers.remove(key, receiver);
}
public Optional<EventReceiver> get(final String key)
{
final EventReceiverHolder holder = receivers.get(key);
if (holder != null) {
return Optional.of(holder.receiver);
} else {
return Optional.absent();
}
return Optional.fromNullable(receivers.get(key));
}
public boolean isDiscoverable()
private String serviceName(String key)
{
return discovery != null;
}
private static class EventReceiverHolder
{
final EventReceiver receiver;
final ServiceInstance service;
private EventReceiverHolder(EventReceiver receiver, ServiceInstance service)
{
this.receiver = receiver;
this.service = service;
}
return String.format(config.getServiceFormat(), key);
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.merger.common.actions.TaskActionClient;
@ -68,6 +69,7 @@ public class TaskMasterLifecycle
final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer,
final ServiceEmitter emitter
)
{
@ -101,7 +103,7 @@ public class TaskMasterLifecycle
final Lifecycle leaderLifecycle = new Lifecycle();
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(taskRunner);
Initialization.makeServiceDiscoveryClient(curator, serviceDiscoveryConfig, leaderLifecycle);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);

View File

@ -46,6 +46,9 @@ import com.metamx.druid.QueryableNode;
import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.GuiceServletConfig;
@ -111,6 +114,7 @@ import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
@ -152,6 +156,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private DBI dbi = null;
private IndexerCoordinatorConfig config = null;
private MergerDBCoordinator mergerDBCoordinator = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private TaskStorage taskStorage = null;
private TaskQueue taskQueue = null;
private TaskLockbox taskLockbox = null;
@ -256,6 +262,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
initializeMergeDBCoordinator();
initializeJacksonSubtypes();
initializeJacksonInjections();
initializeServiceDiscovery();
initializeTaskStorage();
initializeTaskLockbox();
initializeTaskQueue();
@ -365,6 +372,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
taskRunnerFactory,
resourceManagementSchedulerFactory,
getCuratorFramework(),
serviceAnnouncer,
emitter
);
getLifecycle().addManagedInstance(taskMasterLifecycle);
@ -556,6 +564,20 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
}
}
public void initializeServiceDiscovery() throws Exception
{
final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class);
if (serviceDiscovery == null) {
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(), config, getLifecycle()
);
}
if (serviceAnnouncer == null) {
final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config);
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
}
}
public void initializeTaskQueue()
{
if (taskQueue == null) {

View File

@ -36,6 +36,10 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.NoopServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
@ -111,6 +115,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private ServiceProvider coordinatorServiceProvider = null;
private Server server = null;
private ExecutorServiceTaskRunner taskRunner = null;
@ -186,7 +191,6 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
initializeMonitors();
initializeMergerConfig();
initializeServiceDiscovery();
initializeCoordinatorServiceProvider();
initializeDataSegmentPusher();
initializeTaskToolbox();
initializeTaskRunner();
@ -387,16 +391,16 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
public void initializeServiceDiscovery() throws Exception
{
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
if (serviceDiscovery == null) {
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(), config, lifecycle
);
}
}
public void initializeCoordinatorServiceProvider()
{
if (serviceAnnouncer == null) {
final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config);
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
}
if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
workerConfig.getMasterService(),
@ -425,9 +429,17 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
public void initializeEventReceiverProvider()
{
if (eventReceiverProvider == null) {
final EventReceiverProviderConfig config = configFactory.build(EventReceiverProviderConfig.class);
final ServiceAnnouncer myServiceAnnouncer;
if (config.getServiceFormat() == null) {
log.info("EventReceiverProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!");
myServiceAnnouncer = new NoopServiceAnnouncer();
} else {
myServiceAnnouncer = serviceAnnouncer;
}
this.eventReceiverProvider = new EventReceiverProvider(
configFactory.build(EventReceiverProviderConfig.class),
serviceDiscovery
config,
myServiceAnnouncer
);
}
}

View File

@ -34,6 +34,9 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.QueryableNode;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
@ -102,6 +105,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
private ServiceEmitter emitter = null;
private WorkerConfig workerConfig = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private ServiceProvider coordinatorServiceProvider = null;
private WorkerCuratorCoordinator workerCuratorCoordinator = null;
private WorkerTaskMonitor workerTaskMonitor = null;
@ -177,7 +181,6 @@ public class WorkerNode extends QueryableNode<WorkerNode>
initializeMonitors();
initializeMergerConfig();
initializeServiceDiscovery();
initializeCoordinatorServiceProvider();
initializeJacksonInjections();
initializeJacksonSubtypes();
initializeCuratorCoordinator();
@ -336,10 +339,6 @@ public class WorkerNode extends QueryableNode<WorkerNode>
getLifecycle()
);
}
}
public void initializeCoordinatorServiceProvider()
{
if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
workerConfig.getMasterService(),

View File

@ -39,6 +39,7 @@ import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseRuleManagerConfig;
import com.metamx.druid.db.DatabaseSegmentManager;
@ -174,6 +175,10 @@ public class MasterMain
serviceDiscoveryConfig,
lifecycle
);
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(
serviceDiscoveryConfig, serviceDiscovery
);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle);
IndexingServiceClient indexingServiceClient = null;
if (druidMasterConfig.getMergerServiceName() != null) {