1) Move the @Json and @Smile annotations to com.metamx.druid.guice.annotations

2) Changed ServiceAnnouncer to take a DruidNode object
3) Remove ServiceInstanceFactory interface and implementations
4) Add registrations to DiscoveryModule so that you can register nodes that should be announced on startup
5) Make the set of default Monitors configurable
This commit is contained in:
cheddar 2013-07-26 14:03:51 -07:00
parent 02ffd805c8
commit 6b9963b472
29 changed files with 489 additions and 203 deletions

View File

@ -1,27 +0,0 @@
package com.metamx.druid.curator.discovery;
import com.google.common.base.Throwables;
import org.apache.curator.x.discovery.ServiceInstance;
public class AddressPortServiceInstanceFactory implements ServiceInstanceFactory<Void>
{
private final String address;
private final int port;
public AddressPortServiceInstanceFactory(String address, int port)
{
this.address = address;
this.port = port;
}
@Override
public ServiceInstance<Void> create(String service)
{
try {
return ServiceInstance.<Void>builder().name(service).address(address).port(port).build();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -1,8 +1,10 @@
package com.metamx.druid.curator.discovery;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
@ -13,56 +15,66 @@ import java.util.Map;
*/
public class CuratorServiceAnnouncer implements ServiceAnnouncer
{
private static final Logger log = new Logger(CuratorServiceAnnouncer.class);
private static final EmittingLogger log = new EmittingLogger(CuratorServiceAnnouncer.class);
private final ServiceDiscovery<Void> discovery;
private final ServiceInstanceFactory<Void> instanceFactory;
private final Map<String, ServiceInstance<Void>> instanceMap = Maps.newHashMap();
private final Object monitor = new Object();
@Inject
public CuratorServiceAnnouncer(
ServiceDiscovery<Void> discovery,
ServiceInstanceFactory<Void> instanceFactory
ServiceDiscovery<Void> discovery
)
{
this.discovery = discovery;
this.instanceFactory = instanceFactory;
}
@Override
public void announce(String service) throws Exception
public void announce(DruidNode service)
{
final ServiceInstance<Void> instance;
final String serviceName = getServiceName(service);
final ServiceInstance<Void> instance;
synchronized (monitor) {
if (instanceMap.containsKey(service)) {
if (instanceMap.containsKey(serviceName)) {
log.warn("Ignoring request to announce service[%s]", service);
return;
} else {
instance = instanceFactory.create(service);
instanceMap.put(service, instance);
try {
instance = ServiceInstance.<Void>builder()
.name(serviceName)
.address(service.getHost())
.port(service.getPort())
.build();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
instanceMap.put(serviceName, instance);
}
}
try {
log.info("Announcing service[%s]", service);
discovery.registerService(instance);
} catch (Exception e) {
}
catch (Exception e) {
log.warn("Failed to announce service[%s]", service);
synchronized (monitor) {
instanceMap.remove(service);
instanceMap.remove(serviceName);
}
}
}
@Override
public void unannounce(String service) throws Exception
public void unannounce(DruidNode service)
{
final String serviceName = getServiceName(service);
final ServiceInstance<Void> instance;
synchronized (monitor) {
instance = instanceMap.get(service);
instance = instanceMap.get(serviceName);
if (instance == null) {
log.warn("Ignoring request to unannounce service[%s]", service);
return;
@ -72,12 +84,20 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer
log.info("Unannouncing service[%s]", service);
try {
discovery.unregisterService(instance);
} catch (Exception e) {
log.warn(e, "Failed to unannounce service[%s]", service);
} finally {
}
catch (Exception e) {
log.makeAlert(e, "Failed to unannounce service[%s], zombie znode perhaps in existence.", serviceName)
.addData("service", service)
.emit();
}
finally {
synchronized (monitor) {
instanceMap.remove(service);
instanceMap.remove(serviceName);
}
}
}
private String getServiceName(DruidNode service) {
return service.getServiceName().replaceAll("/", ":");
}
}

View File

@ -1,27 +1,175 @@
package com.metamx.druid.curator.discovery;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import javax.annotation.Nullable;
import java.lang.annotation.Annotation;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be
* automatically announced at the end of the lifecycle start.
*
* In order for this to work a ServiceAnnouncer instance *must* be injected and instantiated first.
* This can often be achieved by registering ServiceAnnouncer.class with the LifecycleModule.
*/
public class DiscoveryModule implements Module
{
private static final String NAME = "DiscoveryModule:internal";
public final List<Key<Supplier<DruidNode>>> nodesToAnnounce = new CopyOnWriteArrayList<Key<Supplier<DruidNode>>>();
public boolean configured = false;
/**
* Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle.
*
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
*
* @return this, for chaining.
*/
public DiscoveryModule registerDefault()
{
return registerKey(Key.get(new TypeLiteral<Supplier<DruidNode>>(){}));
}
/**
* Requests that the annotated DruidNode instance be injected and published as part of the lifecycle.
*
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
*
* @param annotation The annotation instance to use in finding the DruidNode instance, usually a Named annotation
* @return this, for chaining.
*/
public DiscoveryModule register(Annotation annotation)
{
return registerKey(Key.get(new TypeLiteral<Supplier<DruidNode>>(){}, annotation));
}
/**
* Requests that the annotated DruidNode instance be injected and published as part of the lifecycle.
*
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
*
* @param annotation The annotation class to use in finding the DruidNode instance
* @return this, for chaining
*/
public DiscoveryModule register(Class<? extends Annotation> annotation)
{
return registerKey(Key.get(new TypeLiteral<Supplier<DruidNode>>(){}, annotation));
}
/**
* Requests that the keyed DruidNode instance be injected and published as part of the lifecycle.
*
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
*
* @param key The key to use in finding the DruidNode instance
* @return this, for chaining
*/
public DiscoveryModule registerKey(Key<Supplier<DruidNode>> key)
{
synchronized (nodesToAnnounce) {
Preconditions.checkState(!configured, "Cannot register key[%s] after configuration.", key);
}
nodesToAnnounce.add(key);
return this;
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.discovery.curator", CuratorDiscoveryConfig.class);
binder.bind(ServiceAnnouncer.class).to(CuratorServiceAnnouncer.class).in(LazySingleton.class);
synchronized (nodesToAnnounce) {
configured = true;
JsonConfigProvider.bind(binder, "druid.discovery.curator", CuratorDiscoveryConfig.class);
binder.bind(CuratorServiceAnnouncer.class).in(LazySingleton.class);
// We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect
binder.bind(ServiceAnnouncer.class)
.to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME)))
.asEagerSingleton();
}
}
@Provides @LazySingleton @Named(NAME)
public CuratorServiceAnnouncer getServiceAnnouncer(
final CuratorServiceAnnouncer announcer,
final Injector injector,
final Lifecycle lifecycle
)
{
lifecycle.addHandler(
new Lifecycle.Handler()
{
private volatile List<Supplier<DruidNode>> nodes = null;
@Override
public void start() throws Exception
{
if (nodes == null) {
nodes = Lists.transform(
nodesToAnnounce,
new Function<Key<Supplier<DruidNode>>, Supplier<DruidNode>>()
{
@Nullable
@Override
public Supplier<DruidNode> apply(
@Nullable Key<Supplier<DruidNode>> input
)
{
return injector.getInstance(input);
}
}
);
}
for (Supplier<DruidNode> node : nodes) {
announcer.announce(node.get());
}
}
@Override
public void stop()
{
if (nodes != null) {
for (Supplier<DruidNode> node : nodes) {
announcer.unannounce(node.get());
}
}
}
},
Lifecycle.Stage.LAST
);
return announcer;
}
@Provides @LazySingleton
@ -33,12 +181,4 @@ public class DiscoveryModule implements Module
{
return Initialization.makeServiceDiscoveryClient(curator, config.get(), lifecycle);
}
@Provides @LazySingleton
public ServiceInstanceFactory<Void> getServiceInstanceFactory(
Supplier<DruidNodeConfig> nodeConfig
)
{
return Initialization.makeServiceInstanceFactory(nodeConfig.get());
}
}

View File

@ -1,18 +1,20 @@
package com.metamx.druid.curator.discovery;
import com.metamx.druid.initialization.DruidNode;
/**
* Does nothing.
*/
public class NoopServiceAnnouncer implements ServiceAnnouncer
{
@Override
public void unannounce(String service)
public void announce(DruidNode node)
{
}
@Override
public void announce(String service)
public void unannounce(DruidNode node)
{
}

View File

@ -1,11 +1,14 @@
package com.metamx.druid.curator.discovery;
import com.metamx.druid.initialization.DruidNode;
/**
* Announces our ability to serve a particular function. Multiple users may announce the same service, in which
* case they are treated as interchangeable instances of that service.
*/
public interface ServiceAnnouncer
{
public void announce(String service) throws Exception;
public void unannounce(String service) throws Exception;
public void announce(DruidNode node);
public void unannounce(DruidNode node);
}

View File

@ -1,8 +0,0 @@
package com.metamx.druid.curator.discovery;
import org.apache.curator.x.discovery.ServiceInstance;
public interface ServiceInstanceFactory<T>
{
public ServiceInstance<T> create(String service);
}

View File

@ -45,7 +45,7 @@ import com.metamx.druid.client.cache.MemcachedCacheConfig;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.query.QueryToolChestWarehouse;
@ -234,16 +234,14 @@ public class BrokerNode extends QueryableNode<BrokerNode>
if (useDiscovery) {
final Lifecycle lifecycle = getLifecycle();
final CuratorDiscoveryConfig curatorDiscoveryConfig = getConfigFactory().build(CuratorDiscoveryConfig.class);
final DruidNodeConfig nodeConfig = getConfigFactory().build(DruidNodeConfig.class);
final DruidNode nodeConfig = getConfigFactory().build(DruidNode.class);
final CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
getConfigFactory().build(CuratorConfig.class), lifecycle
);
final ServiceDiscovery<Void> serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, curatorDiscoveryConfig, lifecycle
);
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(
nodeConfig, serviceDiscovery
);
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(serviceDiscovery);
Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle);
}
}

View File

@ -29,7 +29,7 @@ import javax.validation.constraints.NotNull;
/**
*/
public class DruidNodeConfig
public class DruidNode
{
@NotNull
private String serviceName = null;
@ -41,9 +41,9 @@ public class DruidNodeConfig
private int port = -1;
@JsonCreator
public DruidNodeConfig(
@JsonProperty("host") String host,
public DruidNode(
@JsonProperty("service") String serviceName,
@JsonProperty("host") String host,
@JsonProperty("port") Integer port
)
{
@ -99,4 +99,14 @@ public class DruidNodeConfig
{
return port;
}
@Override
public String toString()
{
return "DruidNode{" +
"serviceName='" + serviceName + '\'' +
", host='" + host + '\'' +
", port=" + port +
'}';
}
}

View File

@ -35,6 +35,7 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.service.ServiceEmitter;
@ -73,9 +74,9 @@ public class EmitterModule implements Module
@Provides
@ManageLifecycle
public ServiceEmitter getServiceEmitter(Supplier<DruidNodeConfig> configSupplier, Emitter emitter)
public ServiceEmitter getServiceEmitter(@Self Supplier<DruidNode> configSupplier, Emitter emitter)
{
final DruidNodeConfig config = configSupplier.get();
final DruidNode config = configSupplier.get();
final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHost(), emitter);
EmittingLogger.registerEmitter(retVal);
return retVal;

View File

@ -36,10 +36,8 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.discovery.AddressPortServiceInstanceFactory;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.guice.DruidGuiceExtensions;
import com.metamx.druid.guice.DruidSecondaryModule;
import com.metamx.druid.http.EmittingRequestLogger;
@ -274,39 +272,35 @@ public class Initialization
}
public static ServiceAnnouncer makeServiceAnnouncer(
DruidNodeConfig config,
ServiceDiscovery<Void> serviceDiscovery
)
{
final ServiceInstanceFactory<Void> serviceInstanceFactory = makeServiceInstanceFactory(config);
return new CuratorServiceAnnouncer(serviceDiscovery, serviceInstanceFactory);
return new CuratorServiceAnnouncer(serviceDiscovery);
}
public static void announceDefaultService(
final DruidNodeConfig nodeConfig,
final DruidNode nodeConfig,
final ServiceAnnouncer serviceAnnouncer,
final Lifecycle lifecycle
) throws Exception
{
final String service = nodeConfig.getServiceName().replace('/', ':');
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceAnnouncer.announce(service);
serviceAnnouncer.announce(nodeConfig);
}
@Override
public void stop()
{
try {
serviceAnnouncer.unannounce(service);
serviceAnnouncer.unannounce(nodeConfig);
}
catch (Exception e) {
log.warn(e, "Failed to unannouce default service[%s]", service);
log.warn(e, "Failed to unannouce default service[%s]", nodeConfig.getServiceName());
}
}
}
@ -371,20 +365,6 @@ public class Initialization
);
}
public static ServiceInstanceFactory<Void> makeServiceInstanceFactory(DruidNodeConfig config)
{
final String host = config.getHost();
final String address;
final int colon = host.indexOf(':');
if (colon < 0) {
address = host;
} else {
address = host.substring(0, colon);
}
return new AddressPortServiceInstanceFactory(address, config.getPort());
}
public static Injector makeInjector(final Object... modules)
{
final Injector baseInjector = Guice.createInjector(

View File

@ -5,8 +5,8 @@ import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.druid.jackson.Json;
import com.metamx.druid.jackson.Smile;
import com.metamx.druid.guice.annotations.Json;
import com.metamx.druid.guice.annotations.Smile;
import org.skife.config.ConfigurationObjectFactory;
import javax.validation.Validator;

View File

@ -27,6 +27,7 @@ import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.util.Types;
import java.lang.annotation.Annotation;
import java.util.Properties;
/**
@ -36,9 +37,45 @@ public class JsonConfigProvider<T> implements Provider<Supplier<T>>
@SuppressWarnings("unchecked")
public static <T> void bind(Binder binder, String propertyBase, Class<T> classToProvide)
{
binder.bind(Key.get(Types.newParameterizedType(Supplier.class, classToProvide)))
.toProvider((Provider) of(propertyBase, classToProvide))
.in(LazySingleton.class);
bind(
binder,
propertyBase,
classToProvide,
(Key) Key.get(Types.newParameterizedType(Supplier.class, classToProvide))
);
}
@SuppressWarnings("unchecked")
public static <T> void bind(Binder binder, String propertyBase, Class<T> classToProvide, Annotation annotation)
{
bind(
binder,
propertyBase,
classToProvide,
(Key) Key.get(Types.newParameterizedType(Supplier.class, classToProvide), annotation)
);
}
@SuppressWarnings("unchecked")
public static <T> void bind(
Binder binder,
String propertyBase,
Class<T> classToProvide,
Class<? extends Annotation> annotation
)
{
bind(
binder,
propertyBase,
classToProvide,
(Key) Key.get(Types.newParameterizedType(Supplier.class, classToProvide), annotation)
);
}
@SuppressWarnings("unchecked")
public static <T> void bind(Binder binder, String propertyBase, Class<T> clazz, Key<Supplier<T>> key)
{
binder.bind(key).toProvider((Provider) of(propertyBase, clazz)).in(LazySingleton.class);
}
public static <T> JsonConfigProvider<T> of(String propertyBase, Class<T> classToProvide)

View File

@ -94,7 +94,7 @@ public class JsonConfigurator
);
}
log.info("Loaded class[%s] as [%s]", clazz, config);
log.info("Loaded class[%s] from props[%s] as [%s]", clazz, propertyBase, config);
return config;
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.guice;
import com.google.common.base.Preconditions;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -26,37 +27,116 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import java.lang.annotation.Annotation;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* A Module to add lifecycle management to the injector. {@link DruidGuiceExtensions} must also be included.
*/
public class LifecycleModule implements Module
{
private final LifecycleScope scope = new LifecycleScope();
private final Key<?>[] eagerClasses;
private final List<Key<?>> eagerClasses = new CopyOnWriteArrayList<Key<?>>();
public boolean configured = false;
/**
* A constructor that takes a list of classes to instantiate eagerly. Class {@link Key}s mentioned here will
* be pulled out of the injector with an injector.getInstance() call when the lifecycle is created.
* Registers a class to instantiate eagerly. Classes mentioned here will be pulled out of
* the injector with an injector.getInstance() call when the lifecycle is created.
*
* Eagerly loaded classes will *not* be automatically added to the Lifecycle unless they are bound to the proper
* scope.
* scope. That is, they are generally eagerly loaded because the loading operation will produce some beneficial
* side-effect even if nothing actually directly depends on the instance.
*
* This mechanism exists to allow the {@link Lifecycle} to be the primary entry point from the injector, not to
* auto-register things with the {@link Lifecycle}
* auto-register things with the {@link Lifecycle}. It is also possible to just bind things eagerly with Guice,
* it is not clear which is actually the best approach. This is more explicit, but eager bindings inside of modules
* is less error-prone.
*
* @param eagerClasses set of classes to instantiate eagerly
* @param clazz, the class to instantiate
* @return this, for chaining.
*/
public LifecycleModule(
Key<?>... eagerClasses
)
public LifecycleModule register(Class<?> clazz)
{
this.eagerClasses = eagerClasses;
return registerKey(Key.get(clazz));
}
/**
* Registers a class/annotation combination to instantiate eagerly. Classes mentioned here will be pulled out of
* the injector with an injector.getInstance() call when the lifecycle is created.
*
* Eagerly loaded classes will *not* be automatically added to the Lifecycle unless they are bound to the proper
* scope. That is, they are generally eagerly loaded because the loading operation will produce some beneficial
* side-effect even if nothing actually directly depends on the instance.
*
* This mechanism exists to allow the {@link Lifecycle} to be the primary entry point from the injector, not to
* auto-register things with the {@link Lifecycle}. It is also possible to just bind things eagerly with Guice,
* it is not clear which is actually the best approach. This is more explicit, but eager bindings inside of modules
* is less error-prone.
*
* @param clazz, the class to instantiate
* @param annotation The annotation instance to register with Guice, usually a Named annotation
* @return this, for chaining.
*/
public LifecycleModule register(Class<?> clazz, Annotation annotation)
{
return registerKey(Key.get(clazz, annotation));
}
/**
* Registers a class/annotation combination to instantiate eagerly. Classes mentioned here will be pulled out of
* the injector with an injector.getInstance() call when the lifecycle is created.
*
* Eagerly loaded classes will *not* be automatically added to the Lifecycle unless they are bound to the proper
* scope. That is, they are generally eagerly loaded because the loading operation will produce some beneficial
* side-effect even if nothing actually directly depends on the instance.
*
* This mechanism exists to allow the {@link Lifecycle} to be the primary entry point from the injector, not to
* auto-register things with the {@link Lifecycle}. It is also possible to just bind things eagerly with Guice,
* it is not clear which is actually the best approach. This is more explicit, but eager bindings inside of modules
* is less error-prone.
*
* @param clazz, the class to instantiate
* @param annotation The annotation class to register with Guice
* @return this, for chaining
*/
public LifecycleModule register(Class<?> clazz, Class<? extends Annotation> annotation)
{
return registerKey(Key.get(clazz, annotation));
}
/**
* Registers a key to instantiate eagerly. {@link Key}s mentioned here will be pulled out of
* the injector with an injector.getInstance() call when the lifecycle is created.
*
* Eagerly loaded classes will *not* be automatically added to the Lifecycle unless they are bound to the proper
* scope. That is, they are generally eagerly loaded because the loading operation will produce some beneficial
* side-effect even if nothing actually directly depends on the instance.
*
* This mechanism exists to allow the {@link Lifecycle} to be the primary entry point from the injector, not to
* auto-register things with the {@link Lifecycle}. It is also possible to just bind things eagerly with Guice,
* it is not clear which is actually the best approach. This is more explicit, but eager bindings inside of modules
* is less error-prone.
*
* @param key The key to use in finding the DruidNode instance
* @return this, for chaining
*/
public LifecycleModule registerKey(Key<?> key)
{
synchronized (eagerClasses) {
Preconditions.checkState(!configured, "Cannot register key[%s] after configuration.", key);
}
eagerClasses.add(key);
return this;
}
@Override
public void configure(Binder binder)
{
binder.bindScope(ManageLifecycle.class, scope);
synchronized (eagerClasses) {
configured = true;
binder.bindScope(ManageLifecycle.class, scope);
}
}
@Provides @LazySingleton

View File

@ -1,4 +1,4 @@
package com.metamx.druid.jackson;
package com.metamx.druid.guice.annotations;
import com.google.inject.BindingAnnotation;

View File

@ -0,0 +1,17 @@
package com.metamx.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Self
{
}

View File

@ -1,4 +1,4 @@
package com.metamx.druid.jackson;
package com.metamx.druid.guice.annotations;
import com.google.inject.BindingAnnotation;

View File

@ -26,6 +26,8 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.guice.annotations.Json;
import com.metamx.druid.guice.annotations.Smile;
/**
*/

View File

@ -6,6 +6,7 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.initialization.DruidNode;
import java.util.concurrent.ConcurrentMap;
@ -32,43 +33,40 @@ public class ChatHandlerProvider
this.handlers = Maps.newConcurrentMap();
}
public void register(final String key, ChatHandler handler)
public void register(final String service, ChatHandler handler)
{
final String service = serviceName(key);
log.info("Registering Eventhandler: %s", key);
log.info("Registering Eventhandler: %s", service);
if (handlers.putIfAbsent(key, handler) != null) {
throw new ISE("handler already registered for key: %s", key);
if (handlers.putIfAbsent(service, handler) != null) {
throw new ISE("handler already registered for service[%s]", service);
}
try {
serviceAnnouncer.announce(service);
serviceAnnouncer.announce(makeDruidNode(service));
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", service);
handlers.remove(key, handler);
log.warn(e, "Failed to register service[%s]", service);
handlers.remove(service, handler);
}
}
public void unregister(final String key)
public void unregister(final String service)
{
final String service = serviceName(key);
log.info("Unregistering chat handler for service[%s]", service);
log.info("Unregistering chat handler: %s", key);
final ChatHandler handler = handlers.get(key);
final ChatHandler handler = handlers.get(service);
if (handler == null) {
log.warn("handler not currently registered, ignoring: %s", key);
log.info("handler not currently registered, ignoring: %s", service);
}
try {
serviceAnnouncer.unannounce(service);
serviceAnnouncer.unannounce(makeDruidNode(service));
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", service);
}
handlers.remove(key, handler);
handlers.remove(service, handler);
}
public Optional<ChatHandler> get(final String key)
@ -76,8 +74,7 @@ public class ChatHandlerProvider
return Optional.fromNullable(handlers.get(key));
}
private String serviceName(String key)
{
return String.format(config.getServiceFormat(), key);
private DruidNode makeDruidNode(String service) {
return new DruidNode(service, config.getHost(), config.getPort());
}
}

View File

@ -32,7 +32,7 @@ import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.indexing.coordinator.exec.TaskConsumer;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -65,7 +65,7 @@ public class TaskMasterLifecycle
final TaskQueue taskQueue,
final TaskActionClientFactory taskActionClientFactory,
final IndexerCoordinatorConfig indexerCoordinatorConfig,
final DruidNodeConfig nodeConfig,
final DruidNode nodeConfig,
final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator,

View File

@ -49,7 +49,6 @@ import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.db.DbTablesConfig;
@ -98,7 +97,7 @@ import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStr
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
@ -377,7 +376,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private void initializeTaskMasterLifecycle()
{
if (taskMasterLifecycle == null) {
final DruidNodeConfig nodeConfig = getConfigFactory().build(DruidNodeConfig.class);
final DruidNode nodeConfig = getConfigFactory().build(DruidNode.class);
taskMasterLifecycle = new TaskMasterLifecycle(
taskQueue,
taskActionClientFactory,
@ -588,9 +587,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
);
}
if (serviceAnnouncer == null) {
DruidNodeConfig nodeConfig = getConfigFactory().build(DruidNodeConfig.class);
final ServiceInstanceFactory<Void> instanceFactory = Initialization.makeServiceInstanceFactory(nodeConfig);
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery);
}
}

View File

@ -1,13 +1,14 @@
package com.metamx.druid.indexing.worker.config;
import org.skife.config.Config;
import org.skife.config.DefaultNull;
public abstract class ChatHandlerProviderConfig
{
@Config("druid.indexer.chathandler.service")
@DefaultNull
public abstract String getServiceFormat();
@Config("druid.indexer.chathandler.publishDiscovery")
public boolean isPublishDiscovery()
{
return false;
}
@Config("druid.host")
public abstract String getHost();

View File

@ -39,7 +39,6 @@ import com.metamx.druid.BaseServerNode;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.NoopServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
@ -55,7 +54,6 @@ import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.indexing.worker.config.WorkerConfig;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
@ -399,9 +397,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
);
}
if (serviceAnnouncer == null) {
DruidNodeConfig nodeConfig = configFactory.build(DruidNodeConfig.class);
final ServiceInstanceFactory<Void> instanceFactory = Initialization.makeServiceInstanceFactory(nodeConfig);
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery);
}
if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
@ -433,16 +429,13 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
if (chatHandlerProvider == null) {
final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class);
final ServiceAnnouncer myServiceAnnouncer;
if (config.getServiceFormat() == null) {
if (config.isPublishDiscovery()) {
myServiceAnnouncer = serviceAnnouncer;
} else {
log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!");
myServiceAnnouncer = new NoopServiceAnnouncer();
} else {
myServiceAnnouncer = serviceAnnouncer;
}
this.chatHandlerProvider = new ChatHandlerProvider(
config,
myServiceAnnouncer
);
this.chatHandlerProvider = new ChatHandlerProvider(config, myServiceAnnouncer);
}
}

View File

@ -33,7 +33,7 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.jackson.Json;
import com.metamx.druid.guice.annotations.Json;
import com.metamx.druid.master.rules.PeriodLoadRule;
import com.metamx.druid.master.rules.Rule;
import org.joda.time.DateTime;

View File

@ -49,7 +49,7 @@ public class MasterModule implements Module
JsonConfigProvider.bind(binder, "druid.manager.segment", DatabaseSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class);
binder.bind(DruidMaster.class);
binder.bind(DruidMaster.class).asEagerSingleton();
binder.bind(ServerInventoryView.class);
binder.bind(DatabaseSegmentManager.class)

View File

@ -2,7 +2,8 @@ package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.initialization.DruidNode;
/**
*/
@ -11,6 +12,6 @@ public class ServerModule implements Module
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid", DruidNodeConfig.class);
JsonConfigProvider.bind(binder, "druid", DruidNode.class, Self.class);
}
}

View File

@ -19,31 +19,25 @@
package com.metamx.druid.http;
import com.google.common.base.Supplier;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.curator.discovery.DiscoveryModule;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.guice.DbConnectorModule;
import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.JacksonConfigManagerModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.MasterModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerInitializer;
import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.metrics.MetricsModule;
import com.metamx.metrics.MonitorScheduler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
@ -62,14 +56,14 @@ public class MasterMain
LogLevelAdjuster.register();
Injector injector = Initialization.makeInjector(
new LifecycleModule(Key.get(MonitorScheduler.class), Key.get(DruidMaster.class)),
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.class,
DbConnectorModule.class,
JacksonConfigManagerModule.class,
CuratorModule.class,
new MetricsModule(),
DiscoveryModule.class,
new DiscoveryModule().register(Self.class),
ServerModule.class,
new JettyServerModule(new MasterJettyServerInitializer()),
MasterModule.class
@ -77,13 +71,7 @@ public class MasterMain
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
final Supplier<DruidNodeConfig> nodeConfig = injector.getInstance(Key.get(new TypeLiteral<Supplier<DruidNodeConfig>>(){}));
final ServiceAnnouncer serviceAnnouncer = injector.getInstance(ServiceAnnouncer.class);
try {
// TODO: Make the announcement work through the lifecycle
Initialization.announceDefaultService(nodeConfig.get(), serviceAnnouncer, lifecycle);
lifecycle.start();
}
catch (Throwable t) {

View File

@ -1,24 +1,28 @@
package com.metamx.druid.metrics;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
import com.metamx.common.logger.Logger;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.JsonConfigurator;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.SysMonitor;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Sets up the {@link MonitorScheduler} to monitor things on a regular schedule. {@link Monitor}s must be explicitly
@ -28,18 +32,30 @@ public class MetricsModule implements Module
{
private static final Logger log = new Logger(MetricsModule.class);
private final Class<? extends Monitor>[] monitors;
private final List<Class<? extends Monitor>> monitors = new CopyOnWriteArrayList<Class<? extends Monitor>>();
public boolean configured = false;
/**
* A constructor that takes a list of {@link Monitor} classes to explicitly bind so that they will be instantiated
*
* @param monitors list of {@link Monitor} classes to explicitly bind
*/
public MetricsModule(
Class<? extends Monitor>... monitors
)
public MetricsModule register(Class<? extends Monitor> monitorClazz)
{
this.monitors = monitors;
synchronized (monitors) {
Preconditions.checkState(!configured, "Cannot register monitor[%s] after configuration.", monitorClazz);
}
monitors.add(monitorClazz);
return this;
}
@Inject
public void setProperties(Properties props, JsonConfigurator configurator)
{
final MonitorsConfig config = configurator.configurate(
props,
"druid.monitoring",
MonitorsConfig.class
);
for (Class<? extends Monitor> monitorClazz : config.getMonitors()) {
register(monitorClazz);
}
}
@Override
@ -47,15 +63,18 @@ public class MetricsModule implements Module
{
JsonConfigProvider.bind(binder, "druid.monitoring", DruidMonitorSchedulerConfig.class);
binder.bind(JvmMonitor.class).in(LazySingleton.class);
binder.bind(SysMonitor.class).in(LazySingleton.class); // TODO: allow for disabling of this monitor
for (Class<? extends Monitor> monitor : monitors) {
binder.bind(monitor).in(LazySingleton.class);
}
// Instantiate eagerly so that we get everything registered and put into the Lifecycle
binder.bind(Key.get(MonitorScheduler.class, Names.named("ForTheEagerness")))
.to(MonitorScheduler.class)
.asEagerSingleton();
}
@Provides @ManageLifecycle
@Provides
@ManageLifecycle
public MonitorScheduler getMonitorScheduler(
Supplier<DruidMonitorSchedulerConfig> config,
ServiceEmitter emitter,
@ -64,8 +83,8 @@ public class MetricsModule implements Module
{
List<Monitor> monitors = Lists.newArrayList();
for (Key<?> key: injector.getBindings().keySet()) {
if (Monitor.class.isAssignableFrom(key.getClass())) {
for (Key<?> key : injector.getBindings().keySet()) {
if (Monitor.class.isAssignableFrom(key.getTypeLiteral().getRawType())) {
final Monitor monitor = (Monitor) injector.getInstance(key);
log.info("Adding monitor[%s]", monitor);

View File

@ -0,0 +1,35 @@
package com.metamx.druid.metrics;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.SysMonitor;
import javax.validation.constraints.NotNull;
import java.util.List;
/**
*/
public class MonitorsConfig
{
@JsonProperty("monitors")
@NotNull
private List<Class<? extends Monitor>> monitors = ImmutableList.<Class<? extends Monitor>>builder()
.add(JvmMonitor.class)
.add(SysMonitor.class)
.build();
public List<Class<? extends Monitor>> getMonitors()
{
return monitors;
}
@Override
public String toString()
{
return "MonitorsConfig{" +
"monitors=" + monitors +
'}';
}
}