diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 68f978929d6..ef8a995295b 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; @@ -42,9 +41,9 @@ import com.metamx.druid.concurrent.Execs; import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; +import com.metamx.druid.curator.CuratorConfig; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.http.RequestLogger; -import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ZkPathsConfig; @@ -70,8 +69,6 @@ import java.lang.reflect.Field; import java.util.Arrays; import java.util.List; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; /** @@ -349,14 +346,10 @@ public abstract class QueryableNode extends Registering private void initializeServerInventoryView() { if (serverInventoryView == null) { - final ExecutorService exec = Executors.newFixedThreadPool( - 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build() - ); serverInventoryView = new ServerInventoryView( getConfigFactory().build(ServerInventoryViewConfig.class), getZkPaths(), getCuratorFramework(), - exec, getJsonMapper() ); lifecycle.addManagedInstance(serverInventoryView); diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java index b15b547a5f2..5da48dcea3e 100644 --- a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java +++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java @@ -24,11 +24,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.MapMaker; +import com.google.inject.Inject; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.druid.concurrent.Execs; import com.metamx.druid.curator.inventory.CuratorInventoryManager; import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy; import com.metamx.druid.curator.inventory.InventoryManagerConfig; +import com.metamx.druid.guice.ManageLifecycle; import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import org.apache.curator.framework.CuratorFramework; @@ -38,11 +41,11 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; /** */ +@ManageLifecycle public class ServerInventoryView implements ServerView, InventoryView { private static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class); @@ -55,11 +58,11 @@ public class ServerInventoryView implements ServerView, InventoryView private static final Map removedSegments = new MapMaker().makeMap(); + @Inject public ServerInventoryView( final ServerInventoryViewConfig config, final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ExecutorService exec, final ObjectMapper jsonMapper ) { @@ -79,7 +82,7 @@ public class ServerInventoryView implements ServerView, InventoryView return zkPaths.getServedSegmentsPath(); } }, - exec, + Execs.singleThreaded("ServerInventoryView-%s"), new CuratorInventoryManagerStrategy() { @Override diff --git a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java b/client/src/main/java/com/metamx/druid/curator/CuratorConfig.java similarity index 96% rename from client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java rename to client/src/main/java/com/metamx/druid/curator/CuratorConfig.java index d09c36f369e..10e7e381278 100644 --- a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java +++ b/client/src/main/java/com/metamx/druid/curator/CuratorConfig.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.initialization; +package com.metamx.druid.curator; import org.skife.config.Config; import org.skife.config.Default; diff --git a/client/src/main/java/com/metamx/druid/curator/CuratorModule.java b/client/src/main/java/com/metamx/druid/curator/CuratorModule.java new file mode 100644 index 00000000000..94fcee12b42 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/curator/CuratorModule.java @@ -0,0 +1,29 @@ +package com.metamx.druid.curator; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.druid.guice.ConfigProvider; +import com.metamx.druid.guice.LazySingleton; +import com.metamx.druid.initialization.Initialization; +import org.apache.curator.framework.CuratorFramework; + +import java.io.IOException; + +/** + */ +public class CuratorModule implements Module +{ + @Override + public void configure(Binder binder) + { + ConfigProvider.bind(binder, CuratorConfig.class); + } + + @Provides @LazySingleton + public CuratorFramework makeCurator(CuratorConfig config, Lifecycle lifecycle) throws IOException + { + return Initialization.makeCuratorFramework(config, lifecycle); + } +} diff --git a/client/src/main/java/com/metamx/druid/curator/discovery/CuratorServiceAnnouncer.java b/client/src/main/java/com/metamx/druid/curator/discovery/CuratorServiceAnnouncer.java index a3156b3205d..035924a6e2b 100644 --- a/client/src/main/java/com/metamx/druid/curator/discovery/CuratorServiceAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/curator/discovery/CuratorServiceAnnouncer.java @@ -1,6 +1,7 @@ package com.metamx.druid.curator.discovery; import com.google.common.collect.Maps; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceInstance; @@ -10,18 +11,19 @@ import java.util.Map; /** * Uses the Curator Service Discovery recipe to announce services. */ -public class CuratorServiceAnnouncer implements ServiceAnnouncer +public class CuratorServiceAnnouncer implements ServiceAnnouncer { private static final Logger log = new Logger(CuratorServiceAnnouncer.class); - private final ServiceDiscovery discovery; - private final ServiceInstanceFactory instanceFactory; - private final Map> instanceMap = Maps.newHashMap(); + private final ServiceDiscovery discovery; + private final ServiceInstanceFactory instanceFactory; + private final Map> instanceMap = Maps.newHashMap(); private final Object monitor = new Object(); + @Inject public CuratorServiceAnnouncer( - ServiceDiscovery discovery, - ServiceInstanceFactory instanceFactory + ServiceDiscovery discovery, + ServiceInstanceFactory instanceFactory ) { this.discovery = discovery; @@ -31,7 +33,7 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer @Override public void announce(String service) throws Exception { - final ServiceInstance instance; + final ServiceInstance instance; synchronized (monitor) { if (instanceMap.containsKey(service)) { @@ -57,7 +59,7 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer @Override public void unannounce(String service) throws Exception { - final ServiceInstance instance; + final ServiceInstance instance; synchronized (monitor) { instance = instanceMap.get(service); diff --git a/client/src/main/java/com/metamx/druid/curator/discovery/DiscoveryModule.java b/client/src/main/java/com/metamx/druid/curator/discovery/DiscoveryModule.java new file mode 100644 index 00000000000..c5a2c701bf8 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/curator/discovery/DiscoveryModule.java @@ -0,0 +1,43 @@ +package com.metamx.druid.curator.discovery; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.druid.guice.JsonConfigProvider; +import com.metamx.druid.guice.LazySingleton; +import com.metamx.druid.initialization.CuratorDiscoveryConfig; +import com.metamx.druid.initialization.DruidNodeConfig; +import com.metamx.druid.initialization.Initialization; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.discovery.ServiceDiscovery; + +/** + */ +public class DiscoveryModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.discovery.curator", CuratorDiscoveryConfig.class); + binder.bind(ServiceAnnouncer.class).to(CuratorServiceAnnouncer.class).in(LazySingleton.class); + } + + @Provides @LazySingleton + public ServiceDiscovery getServiceDiscovery( + CuratorFramework curator, + CuratorDiscoveryConfig config, + Lifecycle lifecycle + ) throws Exception + { + return Initialization.makeServiceDiscoveryClient(curator, config, lifecycle); + } + + @Provides @LazySingleton + public ServiceInstanceFactory getServiceInstanceFactory( + DruidNodeConfig nodeConfig + ) + { + return Initialization.makeServiceInstanceFactory(nodeConfig); + } +} diff --git a/client/src/main/java/com/metamx/druid/http/BrokerNode.java b/client/src/main/java/com/metamx/druid/http/BrokerNode.java index 191c2ea4ad8..815ffbe1b9b 100644 --- a/client/src/main/java/com/metamx/druid/http/BrokerNode.java +++ b/client/src/main/java/com/metamx/druid/http/BrokerNode.java @@ -42,10 +42,11 @@ import com.metamx.druid.client.cache.MapCache; import com.metamx.druid.client.cache.MapCacheConfig; import com.metamx.druid.client.cache.MemcachedCache; import com.metamx.druid.client.cache.MemcachedCacheConfig; +import com.metamx.druid.curator.CuratorConfig; import com.metamx.druid.curator.discovery.ServiceAnnouncer; -import com.metamx.druid.curator.discovery.ServiceInstanceFactory; +import com.metamx.druid.initialization.CuratorDiscoveryConfig; +import com.metamx.druid.initialization.DruidNodeConfig; import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.query.QueryToolChestWarehouse; import com.metamx.druid.query.ReflectionQueryToolChestWarehouse; @@ -227,17 +228,18 @@ public class BrokerNode extends QueryableNode { if (useDiscovery) { final Lifecycle lifecycle = getLifecycle(); - final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class); + final CuratorDiscoveryConfig curatorDiscoveryConfig = getConfigFactory().build(CuratorDiscoveryConfig.class); + final DruidNodeConfig nodeConfig = getConfigFactory().build(DruidNodeConfig.class); final CuratorFramework curatorFramework = Initialization.makeCuratorFramework( - serviceDiscoveryConfig, lifecycle + getConfigFactory().build(CuratorConfig.class), lifecycle ); - final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( - curatorFramework, serviceDiscoveryConfig, lifecycle + final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( + curatorFramework, curatorDiscoveryConfig, lifecycle ); final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer( - serviceDiscoveryConfig, serviceDiscovery + nodeConfig, serviceDiscovery ); - Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle); + Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle); } } diff --git a/client/src/main/java/com/metamx/druid/initialization/ConfigFactoryModule.java b/client/src/main/java/com/metamx/druid/initialization/ConfigFactoryModule.java new file mode 100644 index 00000000000..6d136c53da4 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/initialization/ConfigFactoryModule.java @@ -0,0 +1,46 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.initialization; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.config.Config; +import com.metamx.druid.guice.LazySingleton; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.Properties; + +/** + */ +public class ConfigFactoryModule implements Module +{ + @Override + public void configure(Binder binder) + { + + } + + @Provides @LazySingleton + public ConfigurationObjectFactory makeFactory(Properties props) + { + return Config.createFactory(props); + } +} diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/CuratorDiscoveryConfig.java similarity index 69% rename from client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java rename to client/src/main/java/com/metamx/druid/initialization/CuratorDiscoveryConfig.java index 04776d6545a..ee0f7ced76e 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/CuratorDiscoveryConfig.java @@ -19,21 +19,22 @@ package com.metamx.druid.initialization; -import org.skife.config.Config; +import com.fasterxml.jackson.annotation.JsonProperty; /** */ -public abstract class ServiceDiscoveryConfig extends CuratorConfig +public abstract class CuratorDiscoveryConfig { - @Config("druid.service") - public abstract String getServiceName(); + @JsonProperty + private String path = null; - @Config("druid.host") - public abstract String getHost(); + public String getPath() + { + return path; + } - @Config("druid.port") - public abstract int getPort(); - - @Config("druid.zk.paths.discoveryPath") - public abstract String getDiscoveryPath(); + public boolean useDiscovery() + { + return path != null; + } } diff --git a/client/src/main/java/com/metamx/druid/initialization/DruidNodeConfig.java b/client/src/main/java/com/metamx/druid/initialization/DruidNodeConfig.java new file mode 100644 index 00000000000..94f02c5edd5 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/initialization/DruidNodeConfig.java @@ -0,0 +1,102 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.initialization; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.druid.utils.SocketUtil; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +/** + */ +public abstract class DruidNodeConfig +{ + @NotNull + private String serviceName = null; + + @NotNull + private String host = null; + + @Min(0) @Max(0xffff) + private int port = -1; + + @JsonCreator + public DruidNodeConfig( + @JsonProperty("service") String serviceName, + @JsonProperty("host") String host, + @JsonProperty("port") Integer port + ) + { + this.serviceName = serviceName; + + if (port == null) { + if (host == null) { + setHostAndPort(null, -1); + } + else if (host.contains(":")) { + try { + setHostAndPort(host, Integer.parseInt(host.split(":")[1])); + } + catch (Exception e) { + setHostAndPort(host, -1); + } + } + else { + final int openPort = SocketUtil.findOpenPort(8080); + setHostAndPort(String.format("%s:%d", host, openPort), openPort); + } + } + else { + if (host == null || host.contains(":")) { + setHostAndPort(host, port); + } + else { + setHostAndPort(String.format("%s:%d", host, port), port); + } + } + } + + private void setHostAndPort(String host, int port) + { + this.host = host; + this.port = port; + } + + @JsonProperty("service") + public String getServiceName() + { + return serviceName; + } + + @JsonProperty + public String getHost() + { + return host; + } + + @JsonProperty + public int getPort() + { + return port; + } +} diff --git a/client/src/main/java/com/metamx/druid/initialization/EmitterModule.java b/client/src/main/java/com/metamx/druid/initialization/EmitterModule.java new file mode 100644 index 00000000000..edc01e65a26 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/initialization/EmitterModule.java @@ -0,0 +1,128 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.initialization; + +import com.google.common.collect.Lists; +import com.google.inject.Binder; +import com.google.inject.Binding; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provider; +import com.google.inject.Provides; +import com.google.inject.TypeLiteral; +import com.google.inject.name.Named; +import com.google.inject.name.Names; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.metamx.druid.guice.DruidScopes; +import com.metamx.druid.guice.LazySingleton; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.Emitter; +import com.metamx.emitter.service.ServiceEmitter; + +import java.lang.annotation.Annotation; +import java.util.List; +import java.util.Properties; + +/** + */ +public class EmitterModule implements Module +{ + private static final Logger log = new Logger(EmitterModule.class); + + private final Properties props; + + @Inject + public EmitterModule( + Properties props + ) + { + this.props = props; + } + + @Override + public void configure(Binder binder) + { + String emitterType = props.getProperty("druid.emitter", ""); + + binder.install(new LogEmitterModule()); + binder.install(new HttpEmitterModule()); + + binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(DruidScopes.SINGLETON); + } + + @Provides + @LazySingleton + public ServiceEmitter getServiceEmitter(DruidNodeConfig config, Emitter emitter) + { + final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHost(), emitter); + EmittingLogger.registerEmitter(retVal); + return retVal; + } + + private static class EmitterProvider implements Provider + { + private final String emitterType; + + private Emitter emitter = null; + + EmitterProvider( + String emitterType + ) + { + this.emitterType = emitterType; + } + + @Inject + public void inject(Injector injector) + { + final List> emitterBindings = injector.findBindingsByType(new TypeLiteral(){}); + + for (Binding binding : emitterBindings) { + if (Names.named(emitterType).equals(binding.getKey().getAnnotation())) { + emitter = binding.getProvider().get(); + break; + } + } + + if (emitter == null) { + List knownTypes = Lists.newArrayList(); + for (Binding binding : emitterBindings) { + final Annotation annotation = binding.getKey().getAnnotation(); + if (annotation != null) { + knownTypes.add(((Named) annotation).value()); + } + } + throw new ISE("Uknown emitter type, known types[%s]", knownTypes); + } + } + + + @Override + public Emitter get() + { + if (emitter == null) { + throw new ISE("Emitter was null, that's bad!"); + } + return emitter; + } + } +} diff --git a/client/src/main/java/com/metamx/druid/initialization/HttpEmitterConfig.java b/client/src/main/java/com/metamx/druid/initialization/HttpEmitterConfig.java new file mode 100644 index 00000000000..fd6955d3579 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/initialization/HttpEmitterConfig.java @@ -0,0 +1,36 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.initialization; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Duration; + +/** + */ +public abstract class HttpEmitterConfig extends com.metamx.emitter.core.HttpEmitterConfig +{ + @JsonProperty + private Duration timeOut = new Duration("PT5m"); + + public Duration getReadTimeout() + { + return timeOut; + } +} diff --git a/client/src/main/java/com/metamx/druid/initialization/HttpEmitterModule.java b/client/src/main/java/com/metamx/druid/initialization/HttpEmitterModule.java new file mode 100644 index 00000000000..8f7cc45018f --- /dev/null +++ b/client/src/main/java/com/metamx/druid/initialization/HttpEmitterModule.java @@ -0,0 +1,62 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.initialization; + +import com.google.common.base.Supplier; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.druid.guice.JsonConfigProvider; +import com.metamx.druid.guice.LazySingleton; +import com.metamx.emitter.core.Emitter; +import com.metamx.emitter.core.HttpPostEmitter; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; + +/** + */ +public class HttpEmitterModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.emitter.http", HttpEmitterConfig.class); + } + + @Provides @LazySingleton @Named("http") + public Emitter getEmitter(Supplier config, @Nullable SSLContext sslContext, Lifecycle lifecycle) + { + final HttpClientConfig.Builder builder = HttpClientConfig + .builder() + .withNumConnections(1) + .withReadTimeout(config.get().getReadTimeout()); + + if (sslContext != null) { + builder.withSslContext(sslContext); + } + + return new HttpPostEmitter(config.get(), HttpClientInit.createClient(builder.build(), lifecycle)); + } +} diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java index 0d49ff0cefe..66366a3e739 100644 --- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java +++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java @@ -27,6 +27,7 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; +import com.metamx.druid.curator.CuratorConfig; import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; import com.metamx.druid.curator.discovery.AddressPortServiceInstanceFactory; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; @@ -42,7 +43,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; -import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.zookeeper.data.Stat; import org.mortbay.jetty.Connector; @@ -221,16 +221,16 @@ public class Initialization return framework; } - public static ServiceDiscovery makeServiceDiscoveryClient( + public static ServiceDiscovery makeServiceDiscoveryClient( CuratorFramework discoveryClient, - ServiceDiscoveryConfig config, + CuratorDiscoveryConfig config, Lifecycle lifecycle ) throws Exception { - final ServiceDiscovery serviceDiscovery = + final ServiceDiscovery serviceDiscovery = ServiceDiscoveryBuilder.builder(Void.class) - .basePath(config.getDiscoveryPath()) + .basePath(config.getPath()) .client(discoveryClient) .build(); @@ -260,21 +260,21 @@ public class Initialization } public static ServiceAnnouncer makeServiceAnnouncer( - ServiceDiscoveryConfig config, - ServiceDiscovery serviceDiscovery + DruidNodeConfig config, + ServiceDiscovery serviceDiscovery ) { - final ServiceInstanceFactory serviceInstanceFactory = makeServiceInstanceFactory(config); + final ServiceInstanceFactory serviceInstanceFactory = makeServiceInstanceFactory(config); return new CuratorServiceAnnouncer(serviceDiscovery, serviceInstanceFactory); } public static void announceDefaultService( - final ServiceDiscoveryConfig config, + final DruidNodeConfig nodeConfig, final ServiceAnnouncer serviceAnnouncer, final Lifecycle lifecycle ) throws Exception { - final String service = config.getServiceName().replace('/', ':'); + final String service = nodeConfig.getServiceName().replace('/', ':'); lifecycle.addHandler( new Lifecycle.Handler() @@ -357,7 +357,7 @@ public class Initialization ); } - public static ServiceInstanceFactory makeServiceInstanceFactory(ServiceDiscoveryConfig config) + public static ServiceInstanceFactory makeServiceInstanceFactory(DruidNodeConfig config) { final String host = config.getHost(); final String address; diff --git a/client/src/main/java/com/metamx/druid/initialization/LogEmitterModule.java b/client/src/main/java/com/metamx/druid/initialization/LogEmitterModule.java new file mode 100644 index 00000000000..237a87ec1b6 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/initialization/LogEmitterModule.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.initialization; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import com.metamx.druid.guice.JsonConfigProvider; +import com.metamx.druid.guice.LazySingleton; +import com.metamx.emitter.core.Emitter; +import com.metamx.emitter.core.LoggingEmitter; +import com.metamx.emitter.core.LoggingEmitterConfig; + +/** + */ +public class LogEmitterModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.emitter.logging", LoggingEmitterConfig.class); + } + + @Provides @LazySingleton @Named("logging") + public Emitter makeEmitter(LoggingEmitterConfig config, ObjectMapper jsonMapper) + { + return new LoggingEmitter(config, jsonMapper); + } +} diff --git a/client/src/main/java/com/metamx/druid/initialization/PropertiesModule.java b/client/src/main/java/com/metamx/druid/initialization/PropertiesModule.java new file mode 100644 index 00000000000..3e1eb3efa1a --- /dev/null +++ b/client/src/main/java/com/metamx/druid/initialization/PropertiesModule.java @@ -0,0 +1,135 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.initialization; + +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.io.Closeables; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.metamx.common.config.Config; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.druid.curator.CuratorConfig; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.data.Stat; +import org.skife.config.ConfigurationObjectFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Properties; + +/** + */ +public class PropertiesModule implements Module +{ + private static final Logger log = new Logger(PropertiesModule.class); + + private final String propertiesFile; + + public PropertiesModule(String propertiesFile) + { + this.propertiesFile = propertiesFile; + } + + @Override + public void configure(Binder binder) + { + final Properties zkProps = new Properties(); + final Properties fileProps = new Properties(zkProps); + + // Note that zookeeper coordinates must be either in cmdLine or in runtime.properties + Properties sp = System.getProperties(); + + Properties tmp_props = new Properties(fileProps); // the head of the 3 level Properties chain + tmp_props.putAll(sp); + + final InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile); + if (stream == null) { + log.info("%s not found on classpath, relying only on system properties and zookeeper.", propertiesFile); + } else { + log.info("Loading properties from %s", propertiesFile); + try { + try { + fileProps.load(stream); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + finally { + Closeables.closeQuietly(stream); + } + } + + // log properties from file; stringPropertyNames() would normally cascade down into the sub Properties objects, but + // zkProps (the parent level) is empty at this point so it will only log properties from runtime.properties + for (String prop : fileProps.stringPropertyNames()) { + log.info("Loaded(runtime.properties) Property[%s] as [%s]", prop, fileProps.getProperty(prop)); + } + + final String zkHostsProperty = "druid.zk.service.host"; + + if (tmp_props.getProperty(zkHostsProperty) != null) { + final ConfigurationObjectFactory factory = Config.createFactory(tmp_props); + + ZkPathsConfig config; + try { + config = factory.build(ZkPathsConfig.class); + } + catch (IllegalArgumentException e) { + log.warn(e, "Unable to build ZkPathsConfig. Cannot load properties from ZK."); + config = null; + } + + if (config != null) { + Lifecycle lifecycle = new Lifecycle(); + try { + CuratorFramework curator = Initialization.makeCuratorFramework(factory.build(CuratorConfig.class), lifecycle); + + lifecycle.start(); + + final Stat stat = curator.checkExists().forPath(config.getPropertiesPath()); + if (stat != null) { + final byte[] data = curator.getData().forPath(config.getPropertiesPath()); + zkProps.load(new InputStreamReader(new ByteArrayInputStream(data), Charsets.UTF_8)); + } + + // log properties from zk + for (String prop : zkProps.stringPropertyNames()) { + log.info("Loaded(zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop)); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + lifecycle.stop(); + } + } + } else { + log.warn("property[%s] not set, skipping ZK-specified properties.", zkHostsProperty); + } + + binder.bind(Properties.class).toInstance(tmp_props); + } +} diff --git a/common/pom.xml b/common/pom.xml index 31831cf71fa..9bc588bff7e 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -63,6 +63,10 @@ org.apache.curator curator-x-discovery + + org.hibernate + hibernate-validator + it.uniroma3.mat extendedset @@ -87,6 +91,14 @@ com.fasterxml.jackson.datatype jackson-datatype-joda + + com.fasterxml.jackson.dataformat + jackson-dataformat-smile + + + com.google.inject + guice + org.jdbi jdbi diff --git a/common/src/main/java/com/metamx/druid/concurrent/Execs.java b/common/src/main/java/com/metamx/druid/concurrent/Execs.java index 7b86ca9e76c..619a8199b00 100644 --- a/common/src/main/java/com/metamx/druid/concurrent/Execs.java +++ b/common/src/main/java/com/metamx/druid/concurrent/Execs.java @@ -23,6 +23,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; /** */ @@ -31,7 +33,19 @@ public class Execs public static ExecutorService singleThreaded(String nameFormat) { return Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build() + makeThreadFactory(nameFormat) ); } + + public static ScheduledExecutorService scheduledSingleThreaded(String nameFormat) + { + return Executors.newSingleThreadScheduledExecutor( + makeThreadFactory(nameFormat) + ); + } + + public static ThreadFactory makeThreadFactory(String nameFormat) + { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build(); + } } diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index 1e73b731353..d6b0b3e47f2 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -19,6 +19,7 @@ package com.metamx.druid.db; +import com.google.common.base.Supplier; import com.metamx.common.logger.Logger; import org.apache.commons.dbcp.BasicDataSource; import org.skife.jdbi.v2.DBI; @@ -159,12 +160,14 @@ public class DbConnector } } - private final DbConnectorConfig config; + private final Supplier config; + private final Supplier dbTables; private final DBI dbi; - public DbConnector(DbConnectorConfig config) + public DbConnector(Supplier config, Supplier dbTables) { this.config = config; + this.dbTables = dbTables; this.dbi = new DBI(getDatasource()); } @@ -176,16 +179,28 @@ public class DbConnector private DataSource getDatasource() { - BasicDataSource dataSource = new BasicDataSource(); - dataSource.setUsername(config.getDatabaseUser()); - dataSource.setPassword(config.getDatabasePassword()); - dataSource.setUrl(config.getDatabaseConnectURI()); + DbConnectorConfig connectorConfig = config.get(); - if (config.isValidationQuery()) { - dataSource.setValidationQuery(config.getValidationQuery()); + BasicDataSource dataSource = new BasicDataSource(); + dataSource.setUsername(connectorConfig.getUser()); + dataSource.setPassword(connectorConfig.getPassword()); + dataSource.setUrl(connectorConfig.getConnectURI()); + + if (connectorConfig.isUseValidationQuery()) { + dataSource.setValidationQuery(connectorConfig.getValidationQuery()); dataSource.setTestOnBorrow(true); } return dataSource; } + + public void createSegmentTable() + { + createSegmentTable(dbi, dbTables.get().getSegmentsTable()); + } + + public void createRulesTable() + { + createRuleTable(dbi, dbTables.get().getRulesTable()); + } } diff --git a/common/src/main/java/com/metamx/druid/db/DbConnectorConfig.java b/common/src/main/java/com/metamx/druid/db/DbConnectorConfig.java index fb7d1a99916..f6e15bbcc48 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnectorConfig.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnectorConfig.java @@ -20,37 +20,60 @@ package com.metamx.druid.db; import com.fasterxml.jackson.annotation.JsonProperty; -import org.skife.config.Config; + +import javax.validation.constraints.NotNull; /** */ public abstract class DbConnectorConfig { - @JsonProperty("connectURI") - @Config("druid.database.connectURI") - public abstract String getDatabaseConnectURI(); + @JsonProperty + private boolean createTables = true; - @JsonProperty("user") - @Config("druid.database.user") - public abstract String getDatabaseUser(); + @JsonProperty + @NotNull + private String connectURI = null; - @JsonProperty("password") - @Config("druid.database.password") - public abstract String getDatabasePassword(); + @JsonProperty + @NotNull + private String user = null; - @JsonProperty("segmentTable") - @Config("druid.database.segmentTable") - public abstract String getSegmentTable(); + @JsonProperty + @NotNull + private String password = null; - @JsonProperty("useValidationQuery") - @Config("druid.database.validation") - public boolean isValidationQuery() { - return false; + @JsonProperty + private boolean useValidationQuery = false; + + @JsonProperty + private String validationQuery = "SELECT 1"; + + public boolean isCreateTables() + { + return createTables; + } + + public String getConnectURI() + { + return connectURI; + } + + public String getUser() + { + return user; + } + + public String getPassword() + { + return password; + } + + public boolean isUseValidationQuery() + { + return useValidationQuery; } - @JsonProperty("validationQuery") - @Config("druid.database.validationQuery") public String getValidationQuery() { - return "SELECT 1"; + return validationQuery; } } diff --git a/common/src/main/java/com/metamx/druid/db/DbTablesConfig.java b/common/src/main/java/com/metamx/druid/db/DbTablesConfig.java new file mode 100644 index 00000000000..96d7170f6fb --- /dev/null +++ b/common/src/main/java/com/metamx/druid/db/DbTablesConfig.java @@ -0,0 +1,69 @@ +package com.metamx.druid.db; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.ISE; + +import javax.validation.constraints.NotNull; + +/** + */ +public class DbTablesConfig +{ + public static DbTablesConfig fromBase(String base) + { + return new DbTablesConfig(base, null, null); + } + + @NotNull + private final String base; + + @NotNull + private final String segmentsTable; + + @NotNull + private final String ruleTable; + + @JsonCreator + public DbTablesConfig( + @JsonProperty("base") String base, + @JsonProperty("segments") String segmentsTable, + @JsonProperty("rules") String rulesTable + ) + { + + this.base = base; + this.segmentsTable = makeTableName(segmentsTable, "segments"); + this.ruleTable = makeTableName(rulesTable, "rules"); + } + + private String makeTableName(String explicitTableName, String defaultSuffix) + { + if (explicitTableName == null) { + if (base == null) { + throw new ISE("table[%s] unknown! Both base and %s were null!", defaultSuffix, defaultSuffix); + } + return String.format("%s_%s", base, defaultSuffix); + } + + return explicitTableName; + } + + @JsonProperty + public String getBase() + { + return base; + } + + @JsonProperty("segments") + public String getSegmentsTable() + { + return segmentsTable; + } + + @JsonProperty("rules") + public String getRulesTable() + { + return ruleTable; + } +} \ No newline at end of file diff --git a/common/src/main/java/com/metamx/druid/guice/ConfigProvider.java b/common/src/main/java/com/metamx/druid/guice/ConfigProvider.java new file mode 100644 index 00000000000..79eff1b41ff --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/ConfigProvider.java @@ -0,0 +1,45 @@ +package com.metamx.druid.guice; + +import com.google.common.base.Preconditions; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Provider; +import org.skife.config.ConfigurationObjectFactory; + +/** + */ +public class ConfigProvider implements Provider +{ + public static void bind(Binder binder, Class clazz) + { + binder.bind(clazz).toProvider(of(clazz)).in(DruidScopes.SINGLETON); + } + + public static Provider of(Class clazz) + { + return new ConfigProvider(clazz); + } + + private final Class clazz; + + private T object = null; + + public ConfigProvider( + Class clazz + ) + { + this.clazz = clazz; + } + + @Inject + public void inject(ConfigurationObjectFactory factory) + { + object = factory.build(clazz); + } + + @Override + public T get() + { + return Preconditions.checkNotNull(object, "WTF!? Code misconfigured, inject() didn't get called."); + } +} diff --git a/common/src/main/java/com/metamx/druid/guice/DruidGuiceExtensions.java b/common/src/main/java/com/metamx/druid/guice/DruidGuiceExtensions.java new file mode 100644 index 00000000000..e999b725aa2 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/DruidGuiceExtensions.java @@ -0,0 +1,15 @@ +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; + +/** + */ +public class DruidGuiceExtensions implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bindScope(LazySingleton.class, DruidScopes.SINGLETON); + } +} diff --git a/common/src/main/java/com/metamx/druid/guice/DruidScopes.java b/common/src/main/java/com/metamx/druid/guice/DruidScopes.java new file mode 100644 index 00000000000..151325e587b --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/DruidScopes.java @@ -0,0 +1,71 @@ +package com.metamx.druid.guice; + +import com.google.inject.Inject; +import com.google.inject.Key; +import com.google.inject.Provider; +import com.google.inject.Scope; +import com.google.inject.Scopes; +import com.metamx.common.lifecycle.Lifecycle; + +/** + */ +public class DruidScopes +{ + public static final Scope SINGLETON = new Scope() + { + @Override + public Provider scope(Key key, Provider unscoped) + { + return Scopes.SINGLETON.scope(key, unscoped); + } + + @Override + public String toString() + { + return "DruidScopes.SINGLETON"; + } + }; + + public static final Scope LIFECYCLE = new Scope() + { + @Override + public Provider scope(final Key key, final Provider unscoped) + { + return new Provider() + { + + private Provider provider; + + @Inject + public void inject(final Lifecycle lifecycle) + { + provider = Scopes.SINGLETON.scope( + key, + new Provider() + { + + @Override + public T get() + { + return lifecycle.addManagedInstance(unscoped.get()); + } + } + ); + } + + @Override + public T get() + { + System.out.println(provider); + return provider.get(); + } + }; + } + + @Override + public String toString() + { + return "DruidScopes.LIFECYCLE"; + } + }; +} diff --git a/common/src/main/java/com/metamx/druid/guice/DruidSecondaryModule.java b/common/src/main/java/com/metamx/druid/guice/DruidSecondaryModule.java new file mode 100644 index 00000000000..da8d2ce59c1 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/DruidSecondaryModule.java @@ -0,0 +1,45 @@ +package com.metamx.druid.guice; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Module; +import com.metamx.druid.jackson.Json; +import com.metamx.druid.jackson.Smile; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.Properties; + +/** + */ +public class DruidSecondaryModule implements Module +{ + private final Properties properties; + private final ConfigurationObjectFactory factory; + private final ObjectMapper jsonMapper; + private final ObjectMapper smileMapper; + + @Inject + public DruidSecondaryModule( + Properties properties, + ConfigurationObjectFactory factory, + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper + ) + { + this.properties = properties; + this.factory = factory; + this.jsonMapper = jsonMapper; + this.smileMapper = smileMapper; + } + + @Override + public void configure(Binder binder) + { + binder.install(new DruidGuiceExtensions()); + binder.bind(Properties.class).toInstance(properties); + binder.bind(ConfigurationObjectFactory.class).toInstance(factory); + binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMapper); + binder.bind(ObjectMapper.class).annotatedWith(Smile.class).toInstance(smileMapper); + } +} diff --git a/common/src/main/java/com/metamx/druid/guice/JsonConfigProvider.java b/common/src/main/java/com/metamx/druid/guice/JsonConfigProvider.java new file mode 100644 index 00000000000..64dd107d0f0 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/JsonConfigProvider.java @@ -0,0 +1,121 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.TypeLiteral; +import com.metamx.common.IAE; +import com.metamx.common.ISE; + +import javax.validation.ConstraintViolation; +import javax.validation.Validator; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + */ +public class JsonConfigProvider implements Provider> +{ + + private static final Joiner JOINER = Joiner.on(", "); + + public static void bind(Binder binder, String propertyBase, Class classToProvide) + { + binder.bind(new TypeLiteral>(){}).toProvider(of(propertyBase, classToProvide)).in(DruidScopes.SINGLETON); + } + + public static JsonConfigProvider of(String propertyBase, Class classToProvide) + { + return new JsonConfigProvider(propertyBase, classToProvide); + } + + private final String propertyBase; + private final Class classToProvide; + + private Supplier supplier; + + public JsonConfigProvider( + String propertyBase, + Class classToProvide + ) + { + this.propertyBase = propertyBase; + this.classToProvide = classToProvide; + } + + @Inject + public void inject( + Properties props, + ObjectMapper jsonMapper, + Validator validator + ) + { + Map jsonMap = Maps.newHashMap(); + for (String prop : props.stringPropertyNames()) { + if (prop.startsWith(propertyBase)) { + final String propValue = props.getProperty(prop); + try { + jsonMap.put(prop.substring(propertyBase.length()), jsonMapper.readValue(propValue, Object.class)); + } + catch (IOException e) { + throw new IAE("Unable to parse an object out of prop[%s]=[%s]", prop, propValue); + } + } + } + + final T config = jsonMapper.convertValue(jsonMap, classToProvide); + + final Set> violations = validator.validate(config); + if (!violations.isEmpty()) { + List messages = Lists.newArrayList(); + + for (ConstraintViolation violation : violations) { + messages.add(String.format("%s - %s", violation.getPropertyPath().toString(), violation.getMessage())); + } + + throw new ISE("Configuration violations[%s]", JOINER.join(messages)); + } + + this.supplier = new Supplier() + { + @Override + public T get() + { + return config; + } + }; + } + + @Override + public Supplier get() + { + return supplier; + } +} diff --git a/common/src/main/java/com/metamx/druid/guice/LazySingleton.java b/common/src/main/java/com/metamx/druid/guice/LazySingleton.java new file mode 100644 index 00000000000..005e9795de4 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/LazySingleton.java @@ -0,0 +1,18 @@ +package com.metamx.druid.guice; + +import com.google.inject.ScopeAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + */ +@Target({ ElementType.TYPE, ElementType.METHOD }) +@Retention(RUNTIME) +@ScopeAnnotation +public @interface LazySingleton +{ +} diff --git a/common/src/main/java/com/metamx/druid/guice/LifecycleModule.java b/common/src/main/java/com/metamx/druid/guice/LifecycleModule.java new file mode 100644 index 00000000000..b31bbaddd11 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/LifecycleModule.java @@ -0,0 +1,74 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.lifecycle.Lifecycle; + +/** + * A Module to add lifecycle management to the injector. {@link DruidGuiceExtensions} must also be included. + */ +public class LifecycleModule implements Module +{ + private final LifecycleScope scope = new LifecycleScope(); + private final Key[] eagerClasses; + + /** + * A constructor that takes a list of classes to instantiate eagerly. Class {@link Key}s mentioned here will + * be pulled out of the injector with an injector.getInstance() call when the lifecycle is created. + * + * Eagerly loaded classes will *not* be automatically added to the Lifecycle unless they are bound to the proper + * scope. + * + * This mechanism exists to allow the {@link Lifecycle} to be the primary entry point from the injector, not to + * auto-register things with the {@link Lifecycle} + * + * @param eagerClasses set of classes to instantiate eagerly + */ + public LifecycleModule( + Key... eagerClasses + ) + { + this.eagerClasses = eagerClasses; + } + + @Override + public void configure(Binder binder) + { + binder.bindScope(ManageLifecycle.class, scope); + } + + @Provides @LazySingleton + public Lifecycle getLifecycle(Injector injector) + { + Lifecycle lifecycle = new Lifecycle(); + scope.setLifecycle(lifecycle); + + for (Key key : eagerClasses) { + injector.getInstance(key); // Pull the key so as to "eagerly" load up the class. + } + + return lifecycle; + } +} diff --git a/common/src/main/java/com/metamx/druid/guice/LifecycleScope.java b/common/src/main/java/com/metamx/druid/guice/LifecycleScope.java new file mode 100644 index 00000000000..223113728e3 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/LifecycleScope.java @@ -0,0 +1,67 @@ +package com.metamx.druid.guice; + +import com.google.common.collect.Lists; +import com.google.inject.Key; +import com.google.inject.Provider; +import com.google.inject.Scope; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; + +import java.util.List; + +/** + * A scope that adds objects to the Lifecycle. This is by definition also a lazy singleton scope. + */ +public class LifecycleScope implements Scope +{ + private static final Logger log = new Logger(LifecycleScope.class); + + private Lifecycle lifecycle; + private List instances = Lists.newLinkedList(); + + public void setLifecycle(Lifecycle lifecycle) + { + this.lifecycle = lifecycle; + synchronized (instances) { + for (Object instance : instances) { + lifecycle.addManagedInstance(instance); + } + } + } + + @Override + public Provider scope(final Key key, final Provider unscoped) + { + return new Provider() + { + private T value = null; + + @Override + public synchronized T get() + { + if (value == null) { + final T retVal = unscoped.get(); + + synchronized (instances) { + if (lifecycle == null) { + instances.add(retVal); + } + else { + try { + lifecycle.addMaybeStartManagedInstance(retVal); + } + catch (Exception e) { + log.warn(e, "Caught exception when trying to create a[%s]", key); + return null; + } + } + } + + value = retVal; + } + + return value; + } + }; + } +} diff --git a/common/src/main/java/com/metamx/druid/guice/ManageLifecycle.java b/common/src/main/java/com/metamx/druid/guice/ManageLifecycle.java new file mode 100644 index 00000000000..35752a64b0d --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/ManageLifecycle.java @@ -0,0 +1,19 @@ +package com.metamx.druid.guice; + +import com.google.inject.ScopeAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * Marks the object to be managed by {@link com.metamx.common.lifecycle.Lifecycle} + */ +@Target({ ElementType.TYPE, ElementType.METHOD }) +@Retention(RUNTIME) +@ScopeAnnotation +public @interface ManageLifecycle +{ +} diff --git a/common/src/main/java/com/metamx/druid/jackson/JacksonModule.java b/common/src/main/java/com/metamx/druid/jackson/JacksonModule.java new file mode 100644 index 00000000000..aca59fd8bc3 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/jackson/JacksonModule.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.jackson; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.inject.Binder; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.druid.guice.LazySingleton; + +/** + */ +public class JacksonModule implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class)); + } + + @Provides @LazySingleton @Json + public ObjectMapper jsonMapper() + { + return new DefaultObjectMapper(); + } + + @Provides @LazySingleton @Smile + public ObjectMapper smileMapper() + { + ObjectMapper retVal = new DefaultObjectMapper(new SmileFactory()); + retVal.getJsonFactory().setCodec(retVal); + return retVal; + } +} diff --git a/common/src/main/java/com/metamx/druid/jackson/Json.java b/common/src/main/java/com/metamx/druid/jackson/Json.java new file mode 100644 index 00000000000..b0cc6401613 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/jackson/Json.java @@ -0,0 +1,17 @@ +package com.metamx.druid.jackson; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface Json +{ +} diff --git a/common/src/main/java/com/metamx/druid/jackson/Smile.java b/common/src/main/java/com/metamx/druid/jackson/Smile.java new file mode 100644 index 00000000000..cc1c5b376e5 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/jackson/Smile.java @@ -0,0 +1,17 @@ +package com.metamx.druid.jackson; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface Smile +{ +} diff --git a/common/src/main/java/com/metamx/druid/utils/SocketUtil.java b/common/src/main/java/com/metamx/druid/utils/SocketUtil.java new file mode 100644 index 00000000000..efea5f3003f --- /dev/null +++ b/common/src/main/java/com/metamx/druid/utils/SocketUtil.java @@ -0,0 +1,39 @@ +package com.metamx.druid.utils; + +import com.metamx.common.ISE; + +import java.io.IOException; +import java.net.ServerSocket; + +/** + */ +public class SocketUtil +{ + public static int findOpenPort(int startPort) + { + int currPort = startPort; + + while (currPort < 0xffff) { + ServerSocket socket = null; + try { + socket = new ServerSocket(currPort); + return currPort; + } + catch (IOException e) { + ++currPort; + } + finally { + if (socket != null) { + try { + socket.close(); + } + catch (IOException e) { + + } + } + } + } + + throw new ISE("Unable to find open port between[%d] and [%d]", startPort, currPort); + } +} diff --git a/common/src/test/java/com/metamx/druid/guice/LifecycleScopeTest.java b/common/src/test/java/com/metamx/druid/guice/LifecycleScopeTest.java new file mode 100644 index 00000000000..9948dfe1470 --- /dev/null +++ b/common/src/test/java/com/metamx/druid/guice/LifecycleScopeTest.java @@ -0,0 +1,231 @@ +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import junit.framework.Assert; +import org.junit.Test; + +/** + */ +public class LifecycleScopeTest +{ + @Test + public void testAnnotation() throws Exception + { + final Injector injector = Guice.createInjector( + new DruidGuiceExtensions(), + new LifecycleModule(), + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(TestInterface.class).to(AnnotatedClass.class); + } + } + ); + + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + + final TestInterface instance = injector.getInstance(TestInterface.class); + + testIt(injector, lifecycle, instance); + } + + @Test + public void testExplicit() throws Exception + { + final Injector injector = Guice.createInjector( + new DruidGuiceExtensions(), + new LifecycleModule(), + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(TestInterface.class).to(ExplicitClass.class).in(ManageLifecycle.class); + } + } + ); + + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + + final TestInterface instance = injector.getInstance(TestInterface.class); + + testIt(injector, lifecycle, instance); + } + + private void testIt(Injector injector, Lifecycle lifecycle, TestInterface instance) + throws Exception + { + Assert.assertEquals(0, instance.getStarted()); + Assert.assertEquals(0, instance.getStopped()); + Assert.assertEquals(0, instance.getRan()); + + instance.run(); + Assert.assertEquals(0, instance.getStarted()); + Assert.assertEquals(0, instance.getStopped()); + Assert.assertEquals(1, instance.getRan()); + + lifecycle.start(); + Assert.assertEquals(1, instance.getStarted()); + Assert.assertEquals(0, instance.getStopped()); + Assert.assertEquals(1, instance.getRan()); + + injector.getInstance(TestInterface.class).run(); // It's a singleton + Assert.assertEquals(1, instance.getStarted()); + Assert.assertEquals(0, instance.getStopped()); + Assert.assertEquals(2, instance.getRan()); + + lifecycle.stop(); + Assert.assertEquals(1, instance.getStarted()); + Assert.assertEquals(1, instance.getStopped()); + Assert.assertEquals(2, instance.getRan()); + } + + /** + * This is a test for documentation purposes. It's there to show what weird things Guice will do when + * it sees both the annotation and an explicit binding. + * + * @throws Exception + */ + @Test + public void testAnnotatedAndExplicit() throws Exception + { + final Injector injector = Guice.createInjector( + new DruidGuiceExtensions(), + new LifecycleModule(), + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(TestInterface.class).to(AnnotatedClass.class).in(ManageLifecycle.class); + } + } + ); + + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + + final TestInterface instance = injector.getInstance(TestInterface.class); + + Assert.assertEquals(0, instance.getStarted()); + Assert.assertEquals(0, instance.getStopped()); + Assert.assertEquals(0, instance.getRan()); + + instance.run(); + Assert.assertEquals(0, instance.getStarted()); + Assert.assertEquals(0, instance.getStopped()); + Assert.assertEquals(1, instance.getRan()); + + lifecycle.start(); + Assert.assertEquals(2, instance.getStarted()); + Assert.assertEquals(0, instance.getStopped()); + Assert.assertEquals(1, instance.getRan()); + + injector.getInstance(TestInterface.class).run(); // It's a singleton + Assert.assertEquals(2, instance.getStarted()); + Assert.assertEquals(0, instance.getStopped()); + Assert.assertEquals(2, instance.getRan()); + + lifecycle.stop(); + Assert.assertEquals(2, instance.getStarted()); + Assert.assertEquals(2, instance.getStopped()); + Assert.assertEquals(2, instance.getRan()); + } + + private static interface TestInterface + { + public void run(); + public int getStarted(); + public int getStopped(); + public int getRan(); + } + + @ManageLifecycle + public static class AnnotatedClass implements TestInterface + { + int started = 0; + int stopped = 0; + int ran = 0; + + @LifecycleStart + public void start() + { + ++started; + } + + @LifecycleStop + public void stop() + { + ++stopped; + } + + @Override + public void run() + { + ++ran; + } + + public int getStarted() + { + return started; + } + + public int getStopped() + { + return stopped; + } + + public int getRan() + { + return ran; + } + } + + public static class ExplicitClass implements TestInterface + { + int started = 0; + int stopped = 0; + int ran = 0; + + @LifecycleStart + public void start() + { + ++started; + } + + @LifecycleStop + public void stop() + { + ++stopped; + } + + @Override + public void run() + { + ++ran; + } + + public int getStarted() + { + return started; + } + + public int getStopped() + { + return stopped; + } + + public int getRan() + { + return ran; + } + } + +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java index c00df6bf252..63af3b31993 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java @@ -25,8 +25,8 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.curator.discovery.ServiceAnnouncer; +import com.metamx.druid.initialization.DruidNodeConfig; import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.merger.common.actions.TaskActionClient; import com.metamx.druid.merger.common.actions.TaskActionClientFactory; import com.metamx.druid.merger.common.task.Task; @@ -65,7 +65,7 @@ public class TaskMasterLifecycle final TaskQueue taskQueue, final TaskActionClientFactory taskActionClientFactory, final IndexerCoordinatorConfig indexerCoordinatorConfig, - final ServiceDiscoveryConfig serviceDiscoveryConfig, + final DruidNodeConfig nodeConfig, final TaskRunnerFactory runnerFactory, final ResourceManagementSchedulerFactory managementSchedulerFactory, final CuratorFramework curator, @@ -103,7 +103,7 @@ public class TaskMasterLifecycle final Lifecycle leaderLifecycle = new Lifecycle(); leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(taskRunner); - Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle); + Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, leaderLifecycle); leaderLifecycle.addManagedInstance(taskConsumer); leaderLifecycle.addManagedInstance(resourceManagementScheduler); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 8a2923431ec..1e613cb6675 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Charsets; import com.google.common.base.Optional; +import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -55,9 +56,10 @@ import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.RedirectFilter; import com.metamx.druid.http.RedirectInfo; import com.metamx.druid.http.StatusServlet; +import com.metamx.druid.initialization.CuratorDiscoveryConfig; +import com.metamx.druid.initialization.DruidNodeConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.merger.common.RetryPolicyFactory; import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory; @@ -156,7 +158,7 @@ public class IndexerCoordinatorNode extends QueryableNode serviceDiscovery = null; private ServiceAnnouncer serviceAnnouncer = null; private TaskStorage taskStorage = null; private TaskQueue taskQueue = null; @@ -363,12 +365,12 @@ public class IndexerCoordinatorNode extends QueryableNode instanceFactory = Initialization.makeServiceInstanceFactory(nodeConfig); this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory); } } @@ -614,7 +617,7 @@ public class IndexerCoordinatorNode extends QueryableNodeofInstance(dbConnectorConfig), null).getDBI() // TODO ); } else { throw new ISE("Invalid storage implementation: %s", config.getStorageImpl()); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java index 66ee23eb0df..f53de4c0b93 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java @@ -43,10 +43,11 @@ import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.QueryServlet; import com.metamx.druid.http.StatusServlet; +import com.metamx.druid.initialization.CuratorDiscoveryConfig; +import com.metamx.druid.initialization.DruidNodeConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerInit; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; @@ -56,8 +57,8 @@ import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory; import com.metamx.druid.merger.common.config.RetryPolicyConfig; import com.metamx.druid.merger.common.config.TaskConfig; -import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.merger.common.index.ChatHandlerProvider; +import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner; import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig; @@ -114,7 +115,7 @@ public class ExecutorNode extends BaseServerNode private WorkerConfig workerConfig = null; private DataSegmentPusher segmentPusher = null; private TaskToolboxFactory taskToolboxFactory = null; - private ServiceDiscovery serviceDiscovery = null; + private ServiceDiscovery serviceDiscovery = null; private ServiceAnnouncer serviceAnnouncer = null; private ServiceProvider coordinatorServiceProvider = null; private Server server = null; @@ -391,14 +392,15 @@ public class ExecutorNode extends BaseServerNode public void initializeServiceDiscovery() throws Exception { - final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class); + final CuratorDiscoveryConfig config = configFactory.build(CuratorDiscoveryConfig.class); if (serviceDiscovery == null) { this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( getCuratorFramework(), config, lifecycle ); } if (serviceAnnouncer == null) { - final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config); + DruidNodeConfig nodeConfig = configFactory.build(DruidNodeConfig.class); + final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(nodeConfig); this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory); } if (coordinatorServiceProvider == null) { diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 9ef0a900de6..a6da4b30da7 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -34,14 +34,12 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.QueryableNode; -import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer; -import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.StatusServlet; +import com.metamx.druid.initialization.CuratorDiscoveryConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.TaskLogConfig; @@ -332,7 +330,7 @@ public class WorkerNode extends QueryableNode public void initializeServiceDiscovery() throws Exception { if (serviceDiscovery == null) { - final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class); + final CuratorDiscoveryConfig config = getConfigFactory().build(CuratorDiscoveryConfig.class); this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( getCuratorFramework(), config, diff --git a/pom.xml b/pom.xml index d4ffa9c7ce3..3cb7eb0ffbc 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ UTF-8 - 0.22.3 + 0.23.0-SNAPSHOT 2.0.1-21-22 @@ -219,6 +219,11 @@ jackson-mapper-asl 1.9.11 + + org.hibernate + hibernate-validator + 5.0.1.Final + javax.inject javax.inject diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java index 708c61d9e24..4d51df1a8d5 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java @@ -21,17 +21,20 @@ package com.metamx.druid.db; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.inject.Inject; import com.metamx.common.MapUtils; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import com.metamx.druid.concurrent.Execs; +import com.metamx.druid.guice.ManageLifecycle; import com.metamx.druid.master.rules.PeriodLoadRule; import com.metamx.druid.master.rules.Rule; - import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; @@ -52,12 +55,13 @@ import java.util.concurrent.atomic.AtomicReference; /** */ +@ManageLifecycle public class DatabaseRuleManager { public static void createDefaultRule( final DBI dbi, final String ruleTable, - final String defaultDatasource, + final String defaultTier, final ObjectMapper jsonMapper ) { @@ -72,7 +76,7 @@ public class DatabaseRuleManager String.format( "SELECT id from %s where datasource='%s';", ruleTable, - defaultDatasource + defaultTier ) ); @@ -94,8 +98,8 @@ public class DatabaseRuleManager ruleTable ) ) - .bind("id", String.format("%s_%s", defaultDatasource, version)) - .bind("dataSource", defaultDatasource) + .bind("id", String.format("%s_%s", defaultTier, version)) + .bind("dataSource", defaultTier) .bind("version", version) .bind("payload", jsonMapper.writeValueAsString(defaultRules)) .execute(); @@ -115,6 +119,7 @@ public class DatabaseRuleManager private final ObjectMapper jsonMapper; private final ScheduledExecutorService exec; private final DatabaseRuleManagerConfig config; + private final Supplier dbTables; private final DBI dbi; private final AtomicReference>> rules; @@ -122,18 +127,21 @@ public class DatabaseRuleManager private volatile boolean started = false; + @Inject public DatabaseRuleManager( ObjectMapper jsonMapper, - ScheduledExecutorService exec, DatabaseRuleManagerConfig config, + Supplier dbTables, DBI dbi ) { this.jsonMapper = jsonMapper; - this.exec = exec; this.config = config; + this.dbTables = dbTables; this.dbi = dbi; + this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"); + this.rules = new AtomicReference>>( new ConcurrentHashMap>() ); @@ -147,6 +155,7 @@ public class DatabaseRuleManager return; } + createDefaultRule(dbi, getRulesTable(), config.getDefaultTier(), jsonMapper); ScheduledExecutors.scheduleWithFixedDelay( exec, new Duration(0), @@ -192,8 +201,11 @@ public class DatabaseRuleManager return handle.createQuery( // Return latest version rule by dataSource String.format( - "SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version", - config.getRuleTable() + "SELECT r.dataSource, r.payload " + + "FROM %1$s r " + + "INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds " + + "ON r.datasource = ds.datasource and r.version = ds.version", + getRulesTable() ) ).fold( Maps.>newHashMap(), @@ -255,8 +267,8 @@ public class DatabaseRuleManager if (theRules.get(dataSource) != null) { retVal.addAll(theRules.get(dataSource)); } - if (theRules.get(config.getDefaultDatasource()) != null) { - retVal.addAll(theRules.get(config.getDefaultDatasource())); + if (theRules.get(config.getDefaultTier()) != null) { + retVal.addAll(theRules.get(config.getDefaultTier())); } return retVal; } @@ -275,7 +287,7 @@ public class DatabaseRuleManager handle.createStatement( String.format( "INSERT INTO %s (id, dataSource, version, payload) VALUES (:id, :dataSource, :version, :payload)", - config.getRuleTable() + getRulesTable() ) ) .bind("id", String.format("%s_%s", dataSource, version)) @@ -303,4 +315,6 @@ public class DatabaseRuleManager return true; } + + private String getRulesTable() {return dbTables.get().getRulesTable();} } diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManagerConfig.java b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManagerConfig.java index 41e21a0943b..5f41945df58 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManagerConfig.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManagerConfig.java @@ -27,12 +27,9 @@ import org.skife.config.Default; */ public abstract class DatabaseRuleManagerConfig { - @Config("druid.database.ruleTable") - public abstract String getRuleTable(); - - @Config("druid.database.defaultDatasource") + @Config("druid.database.rules.defaultTier") @Default("_default") - public abstract String getDefaultDatasource(); + public abstract String getDefaultTier(); @Config("druid.database.rules.poll.duration") @Default("PT1M") diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java index 2ea9056627f..fce933e6294 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java @@ -21,9 +21,11 @@ package com.metamx.druid.db; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.inject.Inject; import com.metamx.common.MapUtils; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; @@ -33,7 +35,8 @@ import com.metamx.druid.TimelineObjectHolder; import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidDataSource; - +import com.metamx.druid.concurrent.Execs; +import com.metamx.druid.guice.ManageLifecycle; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; @@ -57,6 +60,7 @@ import java.util.concurrent.atomic.AtomicReference; /** */ +@ManageLifecycle public class DatabaseSegmentManager { private static final Logger log = new Logger(DatabaseSegmentManager.class); @@ -65,26 +69,30 @@ public class DatabaseSegmentManager private final ObjectMapper jsonMapper; private final ScheduledExecutorService exec; - private final DatabaseSegmentManagerConfig config; + private final Supplier config; + private final Supplier dbTables; private final AtomicReference> dataSources; private final DBI dbi; private volatile boolean started = false; + @Inject public DatabaseSegmentManager( ObjectMapper jsonMapper, - ScheduledExecutorService exec, - DatabaseSegmentManagerConfig config, + Supplier config, + Supplier dbTables, DBI dbi ) { this.jsonMapper = jsonMapper; - this.exec = exec; this.config = config; + this.dbTables = dbTables; this.dataSources = new AtomicReference>( new ConcurrentHashMap() ); this.dbi = dbi; + + this.exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"); } @LifecycleStart @@ -98,7 +106,7 @@ public class DatabaseSegmentManager ScheduledExecutors.scheduleWithFixedDelay( exec, new Duration(0), - config.getPollDuration(), + config.get().getPollDuration(), new Runnable() { @Override @@ -136,7 +144,7 @@ public class DatabaseSegmentManager public VersionedIntervalTimeline withHandle(Handle handle) throws Exception { return handle.createQuery( - String.format("SELECT payload FROM %s WHERE dataSource = :dataSource", config.getSegmentTable()) + String.format("SELECT payload FROM %s WHERE dataSource = :dataSource", getSegmentsTable()) ) .bind("dataSource", ds) .fold( @@ -204,7 +212,7 @@ public class DatabaseSegmentManager batch.add( String.format( "UPDATE %s SET used=1 WHERE id = '%s'", - config.getSegmentTable(), + getSegmentsTable(), segment.getIdentifier() ) ); @@ -234,7 +242,7 @@ public class DatabaseSegmentManager public Void withHandle(Handle handle) throws Exception { handle.createStatement( - String.format("UPDATE %s SET used=1 WHERE id = :id", config.getSegmentTable()) + String.format("UPDATE %s SET used=1 WHERE id = :id", getSegmentsTable()) ) .bind("id", segmentId) .execute(); @@ -268,7 +276,7 @@ public class DatabaseSegmentManager public Void withHandle(Handle handle) throws Exception { handle.createStatement( - String.format("UPDATE %s SET used=0 WHERE dataSource = :dataSource", config.getSegmentTable()) + String.format("UPDATE %s SET used=0 WHERE dataSource = :dataSource", getSegmentsTable()) ) .bind("dataSource", ds) .execute(); @@ -298,7 +306,7 @@ public class DatabaseSegmentManager public Void withHandle(Handle handle) throws Exception { handle.createStatement( - String.format("UPDATE %s SET used=0 WHERE id = :segmentID", config.getSegmentTable()) + String.format("UPDATE %s SET used=0 WHERE id = :segmentID", getSegmentsTable()) ).bind("segmentID", segmentID) .execute(); @@ -354,7 +362,7 @@ public class DatabaseSegmentManager public List withHandle(Handle handle) throws Exception { return handle.createQuery( - String.format("SELECT DISTINCT(datasource) FROM %s", config.getSegmentTable()) + String.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable()) ) .fold( Lists.newArrayList(), @@ -398,7 +406,7 @@ public class DatabaseSegmentManager public List> withHandle(Handle handle) throws Exception { return handle.createQuery( - String.format("SELECT payload FROM %s WHERE used=1", config.getSegmentTable()) + String.format("SELECT payload FROM %s WHERE used=1", getSegmentsTable()) ).list(); } } @@ -451,4 +459,8 @@ public class DatabaseSegmentManager log.error(e, "Problem polling DB."); } } + + private String getSegmentsTable() { + return dbTables.get().getSegmentsTable(); + } } diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManagerConfig.java b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManagerConfig.java index 25e90cd16bd..85532089243 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManagerConfig.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManagerConfig.java @@ -19,18 +19,18 @@ package com.metamx.druid.db; +import com.fasterxml.jackson.annotation.JsonProperty; import org.joda.time.Duration; -import org.skife.config.Config; -import org.skife.config.Default; /** */ -public abstract class DatabaseSegmentManagerConfig +public class DatabaseSegmentManagerConfig { - @Config("druid.database.segmentTable") - public abstract String getSegmentTable(); + @JsonProperty + private Duration pollDuration = new Duration("PT1M"); - @Config("druid.database.poll.duration") - @Default("PT1M") - public abstract Duration getPollDuration(); + public Duration getPollDuration() + { + return pollDuration; + } } diff --git a/server/src/main/java/com/metamx/druid/guice/MasterModule.java b/server/src/main/java/com/metamx/druid/guice/MasterModule.java new file mode 100644 index 00000000000..39e6fe10e8c --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/MasterModule.java @@ -0,0 +1,54 @@ +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.druid.client.ServerInventoryViewConfig; +import com.metamx.druid.db.DbConnector; +import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.db.DbTablesConfig; +import com.metamx.druid.initialization.ZkPathsConfig; +import org.skife.jdbi.v2.DBI; + +/** + */ +public class MasterModule implements Module +{ + @Override + public void configure(Binder binder) + { + ConfigProvider.bind(binder, ZkPathsConfig.class); + ConfigProvider.bind(binder, ServerInventoryViewConfig.class); + ConfigProvider.bind(binder, DbConnectorConfig.class); + + JsonConfigProvider.bind(binder, "druid.database.tables", DbTablesConfig.class); + + } + + @Provides @LazySingleton + public DBI getDbi(final DbConnector dbConnector, final DbConnectorConfig config, Lifecycle lifecycle) + { + if (config.isCreateTables()) { + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + dbConnector.createSegmentTable(); + dbConnector.createRulesTable(); + } + + @Override + public void stop() + { + + } + } + ); + } + + return dbConnector.getDBI(); + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/ServerModule.java b/server/src/main/java/com/metamx/druid/guice/ServerModule.java new file mode 100644 index 00000000000..8440df3042b --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/ServerModule.java @@ -0,0 +1,16 @@ +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.metamx.druid.initialization.DruidNodeConfig; + +/** + */ +public class ServerModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid", DruidNodeConfig.class); + } +} diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java index ff5b5ae58d7..3a17b71c7a4 100644 --- a/server/src/main/java/com/metamx/druid/http/InfoResource.java +++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java @@ -26,6 +26,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.inject.Inject; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidServer; @@ -38,7 +39,6 @@ import com.metamx.druid.master.rules.Rule; import org.joda.time.Interval; import javax.annotation.Nullable; -import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 0eb9f53a360..87ef7dc64f1 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -21,57 +21,56 @@ package com.metamx.druid.http; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; +import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.collect.Iterables; import com.google.inject.Guice; import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.TypeLiteral; import com.google.inject.servlet.GuiceFilter; +import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.druid.client.ServerInventoryView; -import com.metamx.druid.client.ServerInventoryViewConfig; import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.concurrent.Execs; import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManagerConfig; import com.metamx.druid.config.JacksonConfigManager; +import com.metamx.druid.curator.CuratorModule; +import com.metamx.druid.curator.discovery.DiscoveryModule; import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.db.DatabaseRuleManager; -import com.metamx.druid.db.DatabaseRuleManagerConfig; import com.metamx.druid.db.DatabaseSegmentManager; -import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DbConnector; -import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.guice.DruidGuiceExtensions; +import com.metamx.druid.guice.DruidSecondaryModule; +import com.metamx.druid.guice.LifecycleModule; +import com.metamx.druid.guice.MasterModule; +import com.metamx.druid.guice.ServerModule; +import com.metamx.druid.initialization.ConfigFactoryModule; +import com.metamx.druid.initialization.DruidNodeConfig; +import com.metamx.druid.initialization.EmitterModule; import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.PropertiesModule; import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ZkPathsConfig; -import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.jackson.JacksonModule; import com.metamx.druid.log.LogLevelAdjuster; import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMasterConfig; import com.metamx.druid.master.LoadQueueTaskMaster; -import com.metamx.druid.utils.PropUtils; -import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.core.Emitters; +import com.metamx.druid.metrics.MetricsModule; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; import com.metamx.http.client.response.ToStringResponseHandler; -import com.metamx.metrics.JvmMonitor; -import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; -import com.metamx.metrics.MonitorSchedulerConfig; -import com.metamx.metrics.SysMonitor; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceProvider; -import org.joda.time.Duration; import org.mortbay.jetty.Server; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.DefaultServlet; @@ -80,11 +79,9 @@ import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; import org.skife.jdbi.v2.DBI; +import javax.annotation.Nullable; import java.net.URL; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.Arrays; /** */ @@ -96,89 +93,41 @@ public class MasterMain { LogLevelAdjuster.register(); - final ObjectMapper jsonMapper = new DefaultObjectMapper(); - final Properties props = Initialization.loadProperties(); - final ConfigurationObjectFactory configFactory = Config.createFactory(props); - final Lifecycle lifecycle = new Lifecycle(); - - final HttpClientConfig.Builder httpClientConfigBuilder = HttpClientConfig.builder().withNumConnections(1); - - final String emitterTimeout = props.getProperty("druid.emitter.timeOut"); - if (emitterTimeout != null) { - httpClientConfigBuilder.withReadTimeout(new Duration(emitterTimeout)); - } - final HttpClient httpClient = HttpClientInit.createClient(httpClientConfigBuilder.build(), lifecycle); - - final ServiceEmitter emitter = new ServiceEmitter( - PropUtils.getProperty(props, "druid.service"), - PropUtils.getProperty(props, "druid.host"), - Emitters.create(props, httpClient, jsonMapper, lifecycle) + Injector injector = makeInjector( + DruidSecondaryModule.class, + new LifecycleModule(Key.get(MonitorScheduler.class)), + EmitterModule.class, + CuratorModule.class, + MetricsModule.class, + DiscoveryModule.class, + ServerModule.class, + MasterModule.class ); - EmittingLogger.registerEmitter(emitter); + + final ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class); + final ConfigurationObjectFactory configFactory = injector.getInstance(ConfigurationObjectFactory.class); + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + + final ServiceEmitter emitter = injector.getInstance(ServiceEmitter.class); final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); - final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class); - CuratorFramework curatorFramework = Initialization.makeCuratorFramework( - serviceDiscoveryConfig, - lifecycle - ); + CuratorFramework curatorFramework = injector.getInstance(CuratorFramework.class); final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class); - final ExecutorService exec = Executors.newFixedThreadPool( - 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build() - ); - ServerInventoryView serverInventoryView = new ServerInventoryView( - configFactory.build(ServerInventoryViewConfig.class), zkPaths, curatorFramework, exec, jsonMapper - ); - lifecycle.addManagedInstance(serverInventoryView); + ServerInventoryView serverInventoryView = injector.getInstance(ServerInventoryView.class); - final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); - final DatabaseRuleManagerConfig databaseRuleManagerConfig = configFactory.build(DatabaseRuleManagerConfig.class); - final DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); - DbConnector.createSegmentTable(dbi, PropUtils.getProperty(props, "druid.database.segmentTable")); - DbConnector.createRuleTable(dbi, PropUtils.getProperty(props, "druid.database.ruleTable")); - DatabaseRuleManager.createDefaultRule( - dbi, databaseRuleManagerConfig.getRuleTable(), databaseRuleManagerConfig.getDefaultDatasource(), jsonMapper - ); - final DatabaseSegmentManager databaseSegmentManager = new DatabaseSegmentManager( - jsonMapper, - scheduledExecutorFactory.create(1, "DatabaseSegmentManager-Exec--%d"), - configFactory.build(DatabaseSegmentManagerConfig.class), - dbi - ); - final DatabaseRuleManager databaseRuleManager = new DatabaseRuleManager( - jsonMapper, - scheduledExecutorFactory.create(1, "DatabaseRuleManager-Exec--%d"), - databaseRuleManagerConfig, - dbi - ); - - final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); - final MonitorScheduler healthMonitor = new MonitorScheduler( - configFactory.build(MonitorSchedulerConfig.class), - globalScheduledExec, - emitter, - ImmutableList.of( - new JvmMonitor(), - new SysMonitor() - ) - ); - lifecycle.addManagedInstance(healthMonitor); + final DatabaseSegmentManager databaseSegmentManager = injector.getInstance(DatabaseSegmentManager.class); + final DatabaseRuleManager databaseRuleManager = injector.getInstance(DatabaseRuleManager.class); final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class); + final DruidNodeConfig nodeConfig = configFactory.build(DruidNodeConfig.class); - final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( - curatorFramework, - serviceDiscoveryConfig, - lifecycle - ); - final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer( - serviceDiscoveryConfig, serviceDiscovery - ); - Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle); + final ServiceDiscovery serviceDiscovery = injector.getInstance(Key.get(new TypeLiteral>(){})); + final ServiceAnnouncer serviceAnnouncer = injector.getInstance(ServiceAnnouncer.class); + Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle); IndexingServiceClient indexingServiceClient = null; if (druidMasterConfig.getMergerServiceName() != null) { @@ -187,9 +136,10 @@ public class MasterMain serviceDiscovery, lifecycle ); - indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider); +// indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider); TODO } + DBI dbi = injector.getInstance(DBI.class); final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class); DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable()); JacksonConfigManager configManager = new JacksonConfigManager( @@ -237,7 +187,7 @@ public class MasterMain ) ); - final Injector injector = Guice.createInjector( + final Injector injector2 = Guice.createInjector( new MasterServletModule( serverInventoryView, databaseSegmentManager, @@ -289,7 +239,7 @@ public class MasterMain final Context root = new Context(server, "/", Context.SESSIONS); root.addServlet(new ServletHolder(new StatusServlet()), "/status"); root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - root.addEventListener(new GuiceServletConfig(injector)); + root.addEventListener(new GuiceServletConfig(injector2)); root.addFilter( new FilterHolder( new RedirectFilter( @@ -304,4 +254,41 @@ public class MasterMain server.start(); server.join(); } + + private static Injector makeInjector(final Object... modules) + { + final Injector baseInjector = Guice.createInjector( + new DruidGuiceExtensions(), + new JacksonModule(), + new PropertiesModule("runtime.properties"), + new ConfigFactoryModule() + ); + + return Guice.createInjector( + Iterables.transform( + Arrays.asList(modules), + new Function() + { + @Override + @SuppressWarnings("unchecked") + public Module apply(@Nullable Object input) + { + if (input instanceof Module) { + baseInjector.injectMembers(input); + return (Module) input; + } + if (input instanceof Class) { + if (Module.class.isAssignableFrom((Class) input)) { + return baseInjector.getInstance((Class) input); + } + else { + throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class); + } + } + throw new ISE("Unknown module type[%s]", input.getClass()); + } + } + ) + ); + } } diff --git a/server/src/main/java/com/metamx/druid/http/MasterResource.java b/server/src/main/java/com/metamx/druid/http/MasterResource.java index 9bb59d79d43..5934fdd5f6e 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterResource.java +++ b/server/src/main/java/com/metamx/druid/http/MasterResource.java @@ -19,10 +19,10 @@ package com.metamx.druid.http; +import com.google.inject.Inject; import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.LoadPeonCallback; -import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.POST; diff --git a/server/src/main/java/com/metamx/druid/loading/DelegatingSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/DelegatingSegmentLoader.java index f113051d506..089f137db1a 100644 --- a/server/src/main/java/com/metamx/druid/loading/DelegatingSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/DelegatingSegmentLoader.java @@ -19,12 +19,12 @@ package com.metamx.druid.loading; +import com.google.inject.Inject; import com.metamx.common.MapUtils; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.Segment; -import javax.inject.Inject; import java.util.Map; /** diff --git a/server/src/main/java/com/metamx/druid/metrics/MetricsModule.java b/server/src/main/java/com/metamx/druid/metrics/MetricsModule.java new file mode 100644 index 00000000000..610514bcc39 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/metrics/MetricsModule.java @@ -0,0 +1,77 @@ +package com.metamx.druid.metrics; + +import com.google.common.collect.Lists; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.logger.Logger; +import com.metamx.druid.concurrent.Execs; +import com.metamx.druid.guice.LazySingleton; +import com.metamx.druid.guice.ManageLifecycle; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.metrics.JvmMonitor; +import com.metamx.metrics.Monitor; +import com.metamx.metrics.MonitorScheduler; +import com.metamx.metrics.MonitorSchedulerConfig; +import com.metamx.metrics.SysMonitor; + +import java.util.List; + +/** + * Sets up the {@link MonitorScheduler} to monitor things on a regular schedule. {@link Monitor}s must be explicitly + * bound in order to be loaded. + */ +public class MetricsModule implements Module +{ + private static final Logger log = new Logger(MetricsModule.class); + + private final Class[] monitors; + + /** + * A constructor that takes a list of {@link Monitor} classes to explicitly bind so that they will be instantiated + * + * @param monitors list of {@link Monitor} classes to explicitly bind + */ + public MetricsModule( + Class... monitors + ) + { + this.monitors = monitors; + } + + @Override + public void configure(Binder binder) + { + binder.bind(JvmMonitor.class).in(LazySingleton.class); + binder.bind(SysMonitor.class).in(LazySingleton.class); + + for (Class monitor : monitors) { + binder.bind(monitor).in(LazySingleton.class); + } + } + + @Provides @ManageLifecycle + public MonitorScheduler getMonitorScheduler(MonitorSchedulerConfig config, ServiceEmitter emitter, Injector injector) + { + List monitors = Lists.newArrayList(); + + for (Key key: injector.getBindings().keySet()) { + if (Monitor.class.isAssignableFrom(key.getClass())) { + final Monitor monitor = (Monitor) injector.getInstance(key); + + log.info("Adding monitor[%s]", monitor); + + monitors.add(monitor); + } + } + + return new MonitorScheduler( + config, + Execs.scheduledSingleThreaded("MonitorScheduler-%s"), + emitter, + monitors + ); + } +} diff --git a/server/src/main/java/com/metamx/druid/metrics/ServerMonitor.java b/server/src/main/java/com/metamx/druid/metrics/ServerMonitor.java index 7db7e3ba922..9898d85a76e 100644 --- a/server/src/main/java/com/metamx/druid/metrics/ServerMonitor.java +++ b/server/src/main/java/com/metamx/druid/metrics/ServerMonitor.java @@ -19,7 +19,6 @@ package com.metamx.druid.metrics; -import com.metamx.druid.client.DruidServer; import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.ServerManager; import com.metamx.emitter.service.ServiceEmitter; diff --git a/server/src/test/java/com/metamx/druid/db/DatabaseSegmentManagerTest.java b/server/src/test/java/com/metamx/druid/db/DatabaseSegmentManagerTest.java index 751cad29a13..223e55faa9d 100644 --- a/server/src/test/java/com/metamx/druid/db/DatabaseSegmentManagerTest.java +++ b/server/src/test/java/com/metamx/druid/db/DatabaseSegmentManagerTest.java @@ -19,10 +19,10 @@ package com.metamx.druid.db; +import com.google.common.base.Suppliers; import com.google.common.collect.Maps; import com.metamx.druid.jackson.DefaultObjectMapper; import org.easymock.EasyMock; -import org.joda.time.Duration; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -32,7 +32,6 @@ import org.skife.jdbi.v2.tweak.HandleCallback; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; /** */ @@ -48,21 +47,8 @@ public class DatabaseSegmentManagerTest dbi = EasyMock.createMock(DBI.class); manager = new DatabaseSegmentManager( new DefaultObjectMapper(), - EasyMock.createMock(ScheduledExecutorService.class), - new DatabaseSegmentManagerConfig() - { - @Override - public String getSegmentTable() - { - return null; - } - - @Override - public Duration getPollDuration() - { - return null; - } - }, + Suppliers.ofInstance(new DatabaseSegmentManagerConfig()), + Suppliers.ofInstance(DbTablesConfig.fromBase("test")), dbi );