diff --git a/client/src/main/java/com/metamx/druid/BaseNode.java b/client/src/main/java/com/metamx/druid/BaseNode.java new file mode 100644 index 00000000000..7e75ea38347 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/BaseNode.java @@ -0,0 +1,413 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutorFactory; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.druid.http.RequestLogger; +import com.metamx.druid.index.v1.serde.ComplexMetricRegistererer; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.ServerConfig; +import com.metamx.druid.initialization.ZkClientConfig; +import com.metamx.druid.utils.PropUtils; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.Emitters; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; +import com.metamx.metrics.JvmMonitor; +import com.metamx.metrics.Monitor; +import com.metamx.metrics.MonitorScheduler; +import com.metamx.metrics.MonitorSchedulerConfig; +import com.metamx.metrics.SysMonitor; +import com.metamx.phonebook.PhoneBook; +import org.I0Itec.zkclient.ZkClient; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.jsontype.NamedType; +import org.codehaus.jackson.smile.SmileFactory; +import org.mortbay.jetty.Server; +import org.skife.config.ConfigurationObjectFactory; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public abstract class BaseNode +{ + private final Logger log; + + private final Lifecycle lifecycle; + private final ObjectMapper jsonMapper; + private final ObjectMapper smileMapper; + private final Properties props; + private final ConfigurationObjectFactory configFactory; + + private PhoneBook phoneBook = null; + private ServiceEmitter emitter = null; + private List monitors = null; + private Server server = null; + private ZkClient zkClient; + private ScheduledExecutorFactory scheduledExecutorFactory; + private RequestLogger requestLogger; + + private boolean initialized = false; + + public BaseNode( + Logger log, + Properties props, + Lifecycle lifecycle, + ObjectMapper jsonMapper, + ObjectMapper smileMapper, + ConfigurationObjectFactory configFactory + ) + { + this.log = log; + this.configFactory = configFactory; + this.props = props; + this.jsonMapper = jsonMapper; + this.lifecycle = lifecycle; + this.smileMapper = smileMapper; + + Preconditions.checkNotNull(props, "props"); + Preconditions.checkNotNull(lifecycle, "lifecycle"); + Preconditions.checkNotNull(jsonMapper, "jsonMapper"); + Preconditions.checkNotNull(smileMapper, "smileMapper"); + Preconditions.checkNotNull(configFactory, "configFactory"); + + Preconditions.checkState(smileMapper.getJsonFactory() instanceof SmileFactory, "smileMapper should use smile."); + } + + @SuppressWarnings("unchecked") + public T setZkClient(ZkClient zkClient) + { + checkFieldNotSetAndSet("zkClient", zkClient); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T setPhoneBook(PhoneBook phoneBook) + { + checkFieldNotSetAndSet("phoneBook", phoneBook); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T setEmitter(ServiceEmitter emitter) + { + checkFieldNotSetAndSet("emitter", emitter); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T setMonitors(List monitors) + { + checkFieldNotSetAndSet("monitors", monitors); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T setServer(Server server) + { + checkFieldNotSetAndSet("server", server); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T setScheduledExecutorFactory(ScheduledExecutorFactory factory) + { + checkFieldNotSetAndSet("scheduledExecutorFactory", factory); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T setRequestLogger(RequestLogger requestLogger) + { + checkFieldNotSetAndSet("requestLogger", requestLogger); + return (T) this; + } + + + @SuppressWarnings("unchecked") + public T registerJacksonSubtype(Class... clazzes) + { + jsonMapper.registerSubtypes(clazzes); + smileMapper.registerSubtypes(clazzes); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T registerJacksonSubtype(NamedType... namedTypes) + { + jsonMapper.registerSubtypes(namedTypes); + smileMapper.registerSubtypes(namedTypes); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T registerComplexMetric(ComplexMetricRegistererer registererer) + { + registererer.register(); + return (T) this; + } + + public Lifecycle getLifecycle() + { + return lifecycle; + } + + public ObjectMapper getJsonMapper() + { + return jsonMapper; + } + + public ObjectMapper getSmileMapper() + { + return smileMapper; + } + + public Properties getProps() + { + return props; + } + + public ConfigurationObjectFactory getConfigFactory() + { + return configFactory; + } + + public ZkClient getZkClient() + { + initializeZkClient(); + return zkClient; + } + + public PhoneBook getPhoneBook() + { + initializePhoneBook(); + return phoneBook; + } + + public ServiceEmitter getEmitter() + { + initializeEmitter(); + return emitter; + } + + public List getMonitors() + { + initializeMonitors(); + return monitors; + } + + public Server getServer() + { + initializeServer(); + return server; + } + + public ScheduledExecutorFactory getScheduledExecutorFactory() + { + initializeScheduledExecutorFactory(); + return scheduledExecutorFactory; + } + + public RequestLogger getRequestLogger() + { + initializeRequestLogger(); + return requestLogger; + } + + private void initializeRequestLogger() + { + if (requestLogger == null) { + try { + setRequestLogger(Initialization.makeRequestLogger(getScheduledExecutorFactory(), getProps())); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + lifecycle.addManagedInstance(requestLogger); + } + } + + private void initializeScheduledExecutorFactory() + { + if (scheduledExecutorFactory == null) { + setScheduledExecutorFactory(ScheduledExecutors.createFactory(getLifecycle())); + } + } + + private void initializeZkClient() + { + if (zkClient == null) { + setZkClient(Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle)); + } + } + + private void initializePhoneBook() + { + if (phoneBook == null) { + setPhoneBook( + Initialization.createPhoneBook( + jsonMapper, + getZkClient(), + "PhoneBook--%s", + lifecycle + ) + ); + } + } + + private void initializeServer() + { + if (server == null) { + setServer(Initialization.makeJettyServer(configFactory.build(ServerConfig.class))); + + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + log.info("Starting Jetty"); + server.start(); + } + + @Override + public void stop() + { + log.info("Stopping Jetty"); + try { + server.stop(); + } + catch (Exception e) { + log.error(e, "Exception thrown while stopping Jetty"); + } + } + } + ); + } + } + + private void initializeMonitors() + { + if (monitors == null) { + List theMonitors = Lists.newArrayList(); + theMonitors.add(new JvmMonitor()); + if (Boolean.parseBoolean(props.getProperty("druid.monitoring.monitorSystem", "false"))) { + theMonitors.add(new SysMonitor()); + } + + setMonitors(theMonitors); + } + } + + private void initializeEmitter() + { + if (emitter == null) { + final HttpClient httpClient = HttpClientInit.createClient( + HttpClientConfig.builder().withNumConnections(1).build(), lifecycle + ); + + setEmitter( + new ServiceEmitter( + PropUtils.getProperty(props, "druid.service"), + PropUtils.getProperty(props, "druid.host"), + Emitters.create(props, httpClient, jsonMapper, lifecycle) + ) + ); + } + EmittingLogger.registerEmitter(emitter); + } + + protected void init() throws Exception + { + doInit(); + initialized = true; + } + + protected abstract void doInit() throws Exception; + + @LifecycleStart + public synchronized void start() throws Exception + { + if (! initialized) { + init(); + } + + lifecycle.start(); + } + + @LifecycleStop + public synchronized void stop() + { + lifecycle.stop(); + } + + protected ScheduledExecutorService startMonitoring(List monitors) + { + final ScheduledExecutorService globalScheduledExec = getScheduledExecutorFactory().create(1, "Global--%d"); + final MonitorScheduler monitorScheduler = new MonitorScheduler( + getConfigFactory().build(MonitorSchedulerConfig.class), + globalScheduledExec, + getEmitter(), + monitors + ); + getLifecycle().addManagedInstance(monitorScheduler); + return globalScheduledExec; + } + + protected void checkFieldNotSetAndSet(String fieldName, Object value) + { + Class theClazz = this.getClass(); + while (theClazz != null && theClazz != Object.class) { + try { + final Field field = theClazz.getDeclaredField(fieldName); + field.setAccessible(true); + Preconditions.checkState(field.get(this) == null, "Cannot set %s once it has already been set.", fieldName); + + field.set(this, value); + return; + } + catch (NoSuchFieldException e) { + // Perhaps it is inherited? + theClazz = theClazz.getSuperclass(); + } + catch (IllegalAccessException e) { + throw Throwables.propagate(e); + } + } + + throw new ISE("Unknown field[%s] on class[%s]", fieldName, this.getClass()); + } +} diff --git a/client/src/main/java/com/metamx/druid/client/ClientSideServerView.java b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java similarity index 98% rename from client/src/main/java/com/metamx/druid/client/ClientSideServerView.java rename to client/src/main/java/com/metamx/druid/client/BrokerServerView.java index aa5aba6fc62..197f69a7765 100644 --- a/client/src/main/java/com/metamx/druid/client/ClientSideServerView.java +++ b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java @@ -40,9 +40,9 @@ import com.metamx.http.client.HttpClient; /** */ -public class ClientSideServerView implements MutableServerView +public class BrokerServerView implements MutableServerView { - private static final Logger log = new Logger(ClientSideServerView.class); + private static final Logger log = new Logger(BrokerServerView.class); private final Object lock = new Object(); @@ -55,7 +55,7 @@ public class ClientSideServerView implements MutableServerView private final ObjectMapper smileMapper; private final HttpClient httpClient; - public ClientSideServerView( + public BrokerServerView( QueryToolChestWarehouse warehose, ObjectMapper smileMapper, HttpClient httpClient diff --git a/client/src/main/java/com/metamx/druid/client/ClientConfig.java b/client/src/main/java/com/metamx/druid/client/ClientConfig.java index c04921f6d1c..1ba5240f542 100644 --- a/client/src/main/java/com/metamx/druid/client/ClientConfig.java +++ b/client/src/main/java/com/metamx/druid/client/ClientConfig.java @@ -23,19 +23,18 @@ import org.skife.config.Config; /** */ -public abstract class ClientConfig +public abstract class ClientConfig extends InventoryManagerConfig { - @Config("druid.zk.paths.announcementsPath") - public abstract String getAnnouncementsPath(); - - @Config("druid.zk.paths.servedSegmentsPath") - public abstract String getServedSegmentsPath(); - - public InventoryManagerConfig getClientInventoryManagerConfig() + public ClientConfig() { - return new InventoryManagerConfig( - getAnnouncementsPath(), - getServedSegmentsPath() - ); + super(null, null); } + + @Override + @Config("druid.zk.paths.announcementsPath") + public abstract String getInventoryIdPath(); + + @Override + @Config("druid.zk.paths.servedSegmentsPath") + public abstract String getInventoryPath(); } diff --git a/client/src/main/java/com/metamx/druid/http/BrokerMain.java b/client/src/main/java/com/metamx/druid/http/BrokerMain.java index e583393891f..a1856552a1f 100644 --- a/client/src/main/java/com/metamx/druid/http/BrokerMain.java +++ b/client/src/main/java/com/metamx/druid/http/BrokerMain.java @@ -19,56 +19,9 @@ package com.metamx.druid.http; -import java.util.Properties; -import java.util.concurrent.ScheduledExecutorService; - -import org.I0Itec.zkclient.ZkClient; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.smile.SmileFactory; -import org.mortbay.jetty.Server; -import org.mortbay.jetty.servlet.Context; -import org.mortbay.jetty.servlet.ServletHolder; -import org.skife.config.ConfigurationObjectFactory; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.servlet.GuiceFilter; -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; -import com.metamx.druid.client.CachingClusteredClient; -import com.metamx.druid.client.ClientConfig; -import com.metamx.druid.client.ClientInventoryManager; -import com.metamx.druid.client.ClientSideServerView; -import com.metamx.druid.client.cache.CacheBroker; -import com.metamx.druid.client.cache.CacheMonitor; -import com.metamx.druid.client.cache.MapCacheBroker; -import com.metamx.druid.client.cache.MapCacheBrokerConfig; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; -import com.metamx.druid.initialization.ZkClientConfig; -import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.log.LogLevelAdjuster; -import com.metamx.druid.query.QueryToolChestWarehouse; -import com.metamx.druid.query.ReflectionQueryToolChestWarehouse; -import com.metamx.emitter.core.Emitters; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; -import com.metamx.metrics.JvmMonitor; -import com.metamx.metrics.Monitor; -import com.metamx.metrics.MonitorScheduler; -import com.metamx.metrics.MonitorSchedulerConfig; -import com.metamx.metrics.SysMonitor; -import com.metamx.phonebook.PhoneBook; -import com.netflix.curator.framework.CuratorFramework; -import com.netflix.curator.x.discovery.ServiceDiscovery; /** */ @@ -81,125 +34,20 @@ public class BrokerMain { LogLevelAdjuster.register(); - final ObjectMapper jsonMapper = new DefaultObjectMapper(); - final ObjectMapper smileMapper = new DefaultObjectMapper(new SmileFactory()); - smileMapper.getJsonFactory().setCodec(smileMapper); + Lifecycle lifecycle = new Lifecycle(); - final Properties props = Initialization.loadProperties(); - final Lifecycle lifecycle = new Lifecycle(); - final ConfigurationObjectFactory configFactory = Config.createFactory(props); - final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle); - final PhoneBook phoneBook = Initialization.createYellowPages( - jsonMapper, zkClient, "Client-ZKYP--%s", lifecycle + lifecycle.addManagedInstance( + BrokerNode.builder().build() ); - final HttpClient httpClient = HttpClientInit.createClient( - HttpClientConfig.builder() - .withNumConnections( - Integer.parseInt(props.getProperty("druid.client.http.connections")) - ) - .build(), - lifecycle - ); - final HttpClient emitterHttpClient = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).build(), lifecycle - ); - final ServiceEmitter emitter = new ServiceEmitter( - props.getProperty("druid.service"), - props.getProperty("druid.host"), - Emitters.create(props, emitterHttpClient, jsonMapper, lifecycle) - ); - - final QueryToolChestWarehouse warehouse = new ReflectionQueryToolChestWarehouse(); - final ClientConfig clientConfig = configFactory.build(ClientConfig.class); - final ClientSideServerView view = new ClientSideServerView(warehouse, smileMapper, httpClient); - final ClientInventoryManager clientInventoryManager = new ClientInventoryManager( - clientConfig.getClientInventoryManagerConfig(), - phoneBook, - view - ); - lifecycle.addManagedInstance(clientInventoryManager); - - final CacheBroker cacheBroker = MapCacheBroker.create( - configFactory.buildWithReplacements(MapCacheBrokerConfig.class, ImmutableMap.of("prefix", "druid.bard.cache")) - ); - final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, smileMapper); - lifecycle.addManagedInstance(baseClient); - - final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); - final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); - final MonitorScheduler monitorScheduler = new MonitorScheduler( - configFactory.build(MonitorSchedulerConfig.class), - globalScheduledExec, - emitter, - ImmutableList.of( - new JvmMonitor(), - new SysMonitor(), - new CacheMonitor(cacheBroker) - ) - ); - lifecycle.addManagedInstance(monitorScheduler); - - final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class); - CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient( - serviceDiscoveryConfig.getZkHosts(), - lifecycle - ); - - final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( - curatorFramework, - configFactory.build(ServiceDiscoveryConfig.class), - lifecycle - ); - - final RequestLogger requestLogger = Initialization.makeRequestLogger( - scheduledExecutorFactory.create( - 1, - "RequestLogger--%d" - ), - props - ); - lifecycle.addManagedInstance(requestLogger); - - final ClientQuerySegmentWalker texasRanger = new ClientQuerySegmentWalker(warehouse, emitter, baseClient); - - final Injector injector = Guice.createInjector(new ClientServletModule(texasRanger, clientInventoryManager, jsonMapper)); - final Server server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class)); - final Context root = new Context(server, "/druid/v2", Context.SESSIONS); - - root.addServlet(new ServletHolder(new StatusServlet()), "/status"); - root.addServlet( - new ServletHolder(new QueryServlet(jsonMapper, smileMapper, texasRanger, emitter, requestLogger)), - "/*" - ); - - root.addEventListener(new GuiceServletConfig(injector)); - root.addFilter(GuiceFilter.class, "/heatmap/*", 0); - root.addFilter(GuiceFilter.class, "/datasources/*", 0); - try { lifecycle.start(); } catch (Throwable t) { - log.error(t, "Error when starting up. Failing."); - System.exit(1); + log.info(t, "Throwable caught at startup, committing seppuku"); + System.exit(2); } - Runtime.getRuntime().addShutdownHook( - new Thread( - new Runnable() - { - @Override - public void run() - { - log.info("Running shutdown hook"); - lifecycle.stop(); - } - } - ) - ); - - server.start(); - server.join(); + lifecycle.join(); } } diff --git a/client/src/main/java/com/metamx/druid/http/BrokerNode.java b/client/src/main/java/com/metamx/druid/http/BrokerNode.java new file mode 100644 index 00000000000..665a2fbd427 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/http/BrokerNode.java @@ -0,0 +1,322 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.http; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.servlet.GuiceFilter; +import com.metamx.common.ISE; +import com.metamx.common.config.Config; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.druid.BaseNode; +import com.metamx.druid.client.BrokerServerView; +import com.metamx.druid.client.CachingClusteredClient; +import com.metamx.druid.client.ClientConfig; +import com.metamx.druid.client.ClientInventoryManager; +import com.metamx.druid.client.cache.CacheBroker; +import com.metamx.druid.client.cache.CacheMonitor; +import com.metamx.druid.client.cache.MapCacheBroker; +import com.metamx.druid.client.cache.MapCacheBrokerConfig; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.ServiceDiscoveryConfig; +import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.query.QueryToolChestWarehouse; +import com.metamx.druid.query.ReflectionQueryToolChestWarehouse; +import com.metamx.druid.utils.PropUtils; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; +import com.metamx.metrics.Monitor; +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.x.discovery.ServiceDiscovery; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.smile.SmileFactory; +import org.mortbay.jetty.servlet.Context; +import org.mortbay.jetty.servlet.ServletHolder; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.List; +import java.util.Properties; + +/** + */ + +public class BrokerNode extends BaseNode +{ + private static final Logger log = new Logger(BrokerNode.class); + + private final List extraModules = Lists.newArrayList(); + private final List pathsForGuiceFilter = Lists.newArrayList(); + + private QueryToolChestWarehouse warehouse = null; + private HttpClient brokerHttpClient = null; + private CacheBroker cacheBroker = null; + + private boolean useDiscovery = true; + + public static Builder builder() + { + return new Builder(); + } + + public BrokerNode( + Properties props, + Lifecycle lifecycle, + ObjectMapper jsonMapper, + ObjectMapper smileMapper, + ConfigurationObjectFactory configFactory + ) + { + super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + } + + public QueryToolChestWarehouse getWarehouse() + { + initializeWarehouse(); + return warehouse; + } + + public BrokerNode setWarehouse(QueryToolChestWarehouse warehouse) + { + checkFieldNotSetAndSet("warehouse", warehouse); + return this; + } + + public HttpClient getBrokerHttpClient() + { + initializeBrokerHttpClient(); + return brokerHttpClient; + } + + public BrokerNode setBrokerHttpClient(HttpClient brokerHttpClient) + { + checkFieldNotSetAndSet("brokerHttpClient", brokerHttpClient); + return this; + } + + public CacheBroker getCacheBroker() + { + initializeCacheBroker(); + return cacheBroker; + } + + public BrokerNode setCacheBroker(CacheBroker cacheBroker) + { + checkFieldNotSetAndSet("cacheBroker", cacheBroker); + return this; + } + + public BrokerNode useDiscovery(boolean useDiscovery) + { + this.useDiscovery = useDiscovery; + return this; + } + + /** + * This method allows you to specify more Guice modules to use primarily for injected extra Jersey resources. + * I'd like to remove the Guice dependency for this, but I don't know how to set up Jersey without Guice... + * + * This is deprecated because at some point in the future, we will eliminate the Guice dependency and anything + * that uses this will break. Use at your own risk. + * + * @param module the module to register with Guice + * + * @return this + */ + @Deprecated + public BrokerNode addModule(Module module) + { + extraModules.add(module); + return this; + } + + /** + * This method is used to specify extra paths that the GuiceFilter should pay attention to. + * + * This is deprecated for the same reason that addModule is deprecated. + * + * @param path the path that the GuiceFilter should pay attention to. + * + * @return this + */ + @Deprecated + public BrokerNode addPathForGuiceFilter(String path) + { + pathsForGuiceFilter.add(path); + return this; + } + + @Override + protected void doInit() throws Exception + { + initializeWarehouse(); + initializeBrokerHttpClient(); + initializeCacheBroker(); + initializeDiscovery(); + + final Lifecycle lifecycle = getLifecycle(); + + final List monitors = getMonitors(); + monitors.add(new CacheMonitor(cacheBroker)); + startMonitoring(monitors); + + final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient); + final ClientInventoryManager clientInventoryManager = new ClientInventoryManager( + getConfigFactory().build(ClientConfig.class), getPhoneBook(), view + ); + lifecycle.addManagedInstance(clientInventoryManager); + + final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, getSmileMapper()); + lifecycle.addManagedInstance(baseClient); + + + final ClientQuerySegmentWalker texasRanger = new ClientQuerySegmentWalker(warehouse, getEmitter(), baseClient); + + List theModules = Lists.newArrayList(); + theModules.add(new ClientServletModule(texasRanger, clientInventoryManager, getJsonMapper())); + theModules.addAll(extraModules); + + final Injector injector = Guice.createInjector(theModules); + final Context root = new Context(getServer(), "/druid/v2", Context.SESSIONS); + + root.addServlet(new ServletHolder(new StatusServlet()), "/status"); + root.addServlet( + new ServletHolder(new QueryServlet(getJsonMapper(), getSmileMapper(), texasRanger, getEmitter(), getRequestLogger())), + "/*" + ); + + root.addEventListener(new GuiceServletConfig(injector)); + root.addFilter(GuiceFilter.class, "/datasources/*", 0); + + for (String path : pathsForGuiceFilter) { + root.addFilter(GuiceFilter.class, path, 0); + } + } + + private void initializeDiscovery() throws Exception + { + if (useDiscovery) { + final Lifecycle lifecycle = getLifecycle(); + + final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class); + CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient( + serviceDiscoveryConfig.getZkHosts(), lifecycle + ); + + final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( + curatorFramework, serviceDiscoveryConfig, lifecycle + ); + } + } + + private void initializeCacheBroker() + { + if (cacheBroker == null) { + setCacheBroker( + MapCacheBroker.create( + getConfigFactory().buildWithReplacements( + MapCacheBrokerConfig.class, + ImmutableMap.of("prefix", "druid.bard.cache") + ) + ) + ); + } + } + + private void initializeBrokerHttpClient() + { + if (brokerHttpClient == null) { + setBrokerHttpClient( + HttpClientInit.createClient( + HttpClientConfig + .builder() + .withNumConnections(PropUtils.getPropertyAsInt(getProps(), "druid.client.http.connections")) + .build(), + getLifecycle() + ) + ); + } + } + + private void initializeWarehouse() + { + if (warehouse == null) { + setWarehouse(new ReflectionQueryToolChestWarehouse()); + } + } + + public static class Builder + { + private ObjectMapper jsonMapper = null; + private ObjectMapper smileMapper = null; + private Lifecycle lifecycle = null; + private Properties props = null; + private ConfigurationObjectFactory configFactory = null; + + public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper) + { + this.jsonMapper = jsonMapper; + this.smileMapper = smileMapper; + return this; + } + + public Builder withProps(Properties props) + { + this.props = props; + return this; + } + + public Builder withConfigFactory(ConfigurationObjectFactory configFactory) + { + this.configFactory = configFactory; + return this; + } + + public BrokerNode build() + { + if (jsonMapper == null && smileMapper == null) { + jsonMapper = new DefaultObjectMapper(); + smileMapper = new DefaultObjectMapper(new SmileFactory()); + smileMapper.getJsonFactory().setCodec(smileMapper); + } + else if (jsonMapper == null || smileMapper == null) { + throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper); + } + + if (lifecycle == null) { + lifecycle = new Lifecycle(); + } + + if (props == null) { + props = Initialization.loadProperties(); + } + + if (configFactory == null) { + configFactory = Config.createFactory(props); + } + + return new BrokerNode(props, lifecycle, jsonMapper, smileMapper, configFactory); + } + } +} diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java index 301070d9093..fec4d0826e6 100644 --- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java +++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java @@ -38,11 +38,13 @@ import com.google.common.base.Throwables; import com.google.common.io.Closeables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.druid.client.ZKPhoneBook; import com.metamx.druid.http.FileRequestLogger; import com.metamx.druid.http.RequestLogger; +import com.metamx.druid.utils.PropUtils; import com.metamx.druid.zk.StringZkSerializer; import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFrameworkFactory; @@ -88,7 +90,7 @@ public class Initialization return retVal; } - public static ZKPhoneBook createYellowPages( + public static ZKPhoneBook createPhoneBook( ObjectMapper jsonMapper, ZkClient zkClient, String threadNameFormat, Lifecycle lifecycle ) { @@ -271,15 +273,11 @@ public class Initialization return serviceProvider; } - public static RequestLogger makeRequestLogger(ScheduledExecutorService exec, Properties props) throws IOException + public static RequestLogger makeRequestLogger(ScheduledExecutorFactory factory, Properties props) throws IOException { - final String property = "druid.request.logging.dir"; - final String loggingDir = props.getProperty(property); - - if (loggingDir == null) { - throw new ISE("property[%s] not set.", property); - } - - return new FileRequestLogger(exec, new File(loggingDir)); + return new FileRequestLogger( + factory.create(1, "RequestLogger-%s"), + new File(PropUtils.getProperty(props, "druid.request.logging.dir")) + ); } } diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java index 2961ca4276b..9ece02720e2 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java @@ -77,7 +77,7 @@ public class IndexIO public static void registerHandler(IndexIOHandler handler) { - if (handler == null) { + if (IndexIO.handler == null) { IndexIO.handler = handler; } else { diff --git a/server/src/main/java/com/metamx/druid/coordination/legacy/TheSizeAdjusterConfig.java b/index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricRegistererer.java similarity index 58% rename from server/src/main/java/com/metamx/druid/coordination/legacy/TheSizeAdjusterConfig.java rename to index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricRegistererer.java index a79d20df796..c0dcade7ed9 100644 --- a/server/src/main/java/com/metamx/druid/coordination/legacy/TheSizeAdjusterConfig.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricRegistererer.java @@ -17,14 +17,17 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.coordination.legacy; - -import org.skife.config.Config; +package com.metamx.druid.index.v1.serde; /** + * This is a "factory" interface for registering complex metrics in the system. It exists because I'm unaware of + * another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface + * must be instantiatable via a no argument default constructor (the MR jobs on Hadoop use reflection to instantiate + * instances). + * + * The name is not a typo, I felt that it needed an extra "er" to make the pronunciation that much more difficult. */ -public abstract class TheSizeAdjusterConfig +public interface ComplexMetricRegistererer { - @Config("druid.zk.paths.indexesPath") - public abstract String getSegmentBasePath(); + public void register(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 4c76224dd14..b65f562ce12 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -380,8 +380,8 @@ public class IndexerCoordinatorNode if (taskToolbox == null) { final RestS3Service s3Client = new RestS3Service( new AWSCredentials( - props.getProperty("com.metamx.aws.accessKey"), - props.getProperty("com.metamx.aws.secretKey") + PropUtils.getProperty(props, "com.metamx.aws.accessKey"), + PropUtils.getProperty(props, "com.metamx.aws.secretKey") ) ); final SegmentPusher segmentPusher = new S3SegmentPusher( @@ -435,7 +435,7 @@ public class IndexerCoordinatorNode { if (taskInventoryManager == null) { final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle); - final PhoneBook masterYp = Initialization.createYellowPages( + final PhoneBook masterYp = Initialization.createPhoneBook( jsonMapper, zkClient, "Master-ZKYP--%s", diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 2f45f094b53..533bc7b6c28 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -280,8 +280,8 @@ public class WorkerNode if (taskToolbox == null) { final RestS3Service s3Client = new RestS3Service( new AWSCredentials( - props.getProperty("com.metamx.aws.accessKey"), - props.getProperty("com.metamx.aws.secretKey") + PropUtils.getProperty(props, "com.metamx.aws.accessKey"), + PropUtils.getProperty(props, "com.metamx.aws.secretKey") ) ); final SegmentPusher segmentPusher = new S3SegmentPusher( @@ -296,7 +296,7 @@ public class WorkerNode public void initializeCuratorFramework() throws IOException { curatorFramework = Initialization.makeCuratorFrameworkClient( - props.getProperty("druid.zk.service.host"), + PropUtils.getProperty(props, "druid.zk.service.host"), lifecycle ); } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java index 0b8d6efac45..60d290992d5 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java @@ -19,75 +19,53 @@ package com.metamx.druid.realtime; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ScheduledExecutorService; - -import org.I0Itec.zkclient.ZkClient; -import org.codehaus.jackson.map.BeanProperty; -import org.codehaus.jackson.map.DeserializationContext; -import org.codehaus.jackson.map.InjectableValues; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.jsontype.NamedType; -import org.codehaus.jackson.smile.SmileFactory; -import org.codehaus.jackson.type.TypeReference; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; -import org.mortbay.jetty.Server; -import org.mortbay.jetty.servlet.Context; -import org.mortbay.jetty.servlet.ServletHolder; -import org.skife.config.ConfigurationObjectFactory; - -import com.google.common.collect.Lists; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.metamx.common.ISE; -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import com.metamx.druid.BaseServerNode; import com.metamx.druid.client.ClientConfig; import com.metamx.druid.client.ClientInventoryManager; import com.metamx.druid.client.MutableServerView; import com.metamx.druid.client.OnlyNewSegmentWatcherServerView; import com.metamx.druid.client.ServerView; -import com.metamx.druid.collect.StupidPool; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.QueryServlet; -import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServerInit; -import com.metamx.druid.initialization.ZkClientConfig; import com.metamx.druid.jackson.DefaultObjectMapper; -import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.utils.PropUtils; -import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.core.Emitters; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; -import com.metamx.metrics.JvmMonitor; import com.metamx.metrics.Monitor; -import com.metamx.metrics.MonitorScheduler; -import com.metamx.metrics.MonitorSchedulerConfig; -import com.metamx.metrics.SysMonitor; -import com.metamx.phonebook.PhoneBook; +import org.codehaus.jackson.map.BeanProperty; +import org.codehaus.jackson.map.DeserializationContext; +import org.codehaus.jackson.map.InjectableValues; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.smile.SmileFactory; +import org.codehaus.jackson.type.TypeReference; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.security.AWSCredentials; +import org.mortbay.jetty.servlet.Context; +import org.mortbay.jetty.servlet.ServletHolder; +import org.skife.config.ConfigurationObjectFactory; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; /** */ -public class RealtimeNode +public class RealtimeNode extends BaseServerNode { private static final Logger log = new Logger(RealtimeNode.class); @@ -96,155 +74,112 @@ public class RealtimeNode return new Builder(); } - private final Lifecycle lifecycle; - private final ObjectMapper jsonMapper; - private final ObjectMapper smileMapper; - private final Properties props; - private final ConfigurationObjectFactory configFactory; - private final Map injectablesMap = Maps.newLinkedHashMap(); - private PhoneBook phoneBook = null; - private ServiceEmitter emitter = null; - private ServerView view = null; private MetadataUpdater metadataUpdater = null; - private QueryRunnerFactoryConglomerate conglomerate = null; private SegmentPusher segmentPusher = null; private List fireDepartments = null; - private List monitors = null; - private Server server = null; + private ServerView view = null; private boolean initialized = false; public RealtimeNode( - ObjectMapper jsonMapper, - ObjectMapper smileMapper, - Lifecycle lifecycle, - Properties props, - ConfigurationObjectFactory configFactory + Properties props, + Lifecycle lifecycle, + ObjectMapper jsonMapper, + ObjectMapper smileMapper, + ConfigurationObjectFactory configFactory ) { - this.jsonMapper = jsonMapper; - this.smileMapper = smileMapper; - this.lifecycle = lifecycle; - this.props = props; - this.configFactory = configFactory; - } - - public RealtimeNode setPhoneBook(PhoneBook phoneBook) - { - this.phoneBook = phoneBook; - return this; - } - - public RealtimeNode setEmitter(ServiceEmitter emitter) - { - this.emitter = emitter; - return this; + super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); } public RealtimeNode setView(ServerView view) { + Preconditions.checkState(this.view == null, "Cannot set view once it has already been set."); this.view = view; return this; } public RealtimeNode setMetadataUpdater(MetadataUpdater metadataUpdater) { + Preconditions.checkState(this.metadataUpdater == null, "Cannot set metadataUpdater once it has already been set."); this.metadataUpdater = metadataUpdater; return this; } - public RealtimeNode setConglomerate(QueryRunnerFactoryConglomerate conglomerate) - { - this.conglomerate = conglomerate; - return this; - } - public RealtimeNode setSegmentPusher(SegmentPusher segmentPusher) { + Preconditions.checkState(this.segmentPusher == null, "Cannot set segmentPusher once it has already been set."); this.segmentPusher = segmentPusher; return this; } public RealtimeNode setFireDepartments(List fireDepartments) { + Preconditions.checkState(this.fireDepartments == null, "Cannot set fireDepartments once it has already been set."); this.fireDepartments = fireDepartments; return this; } - public RealtimeNode setMonitors(List monitors) - { - this.monitors = Lists.newArrayList(monitors); - return this; - } - - public void setServer(Server server) - { - this.server = server; - } - public RealtimeNode registerJacksonInjectable(String name, Object object) { + Preconditions.checkState(injectablesMap.containsKey(name), "Already registered jackson object[%s]", name); injectablesMap.put(name, object); return this; } - public RealtimeNode registerJacksonSubtype(Class... clazzes) + public MetadataUpdater getMetadataUpdater() { - jsonMapper.registerSubtypes(clazzes); - return this; + initializeMetadataUpdater(); + return metadataUpdater; } - public RealtimeNode registerJacksonSubtype(NamedType... namedTypes) + public SegmentPusher getSegmentPusher() { - jsonMapper.registerSubtypes(namedTypes); - return this; + initializeSegmentPusher(); + return segmentPusher; } - private void init() throws Exception + public List getFireDepartments() { - if (phoneBook == null) { - final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle); - phoneBook = Initialization.createYellowPages( - jsonMapper, - zkClient, - "Realtime-ZKYP--%s", - lifecycle - ); - } + initializeFireDepartments(); + return fireDepartments; + } - initializeEmitter(); + public ServerView getView() + { + initializeView(); + return view; + } + + protected void doInit() throws Exception + { initializeView(); initializeMetadataUpdater(); - initializeQueryRunnerFactoryConglomerate(); initializeSegmentPusher(); - initializeMonitors(); - initializeServer(); initializeJacksonInjectables(); + initializeFireDepartments(); + + final Lifecycle lifecycle = getLifecycle(); + final ServiceEmitter emitter = getEmitter(); + final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); + final List monitors = getMonitors(); + monitors.add(new RealtimeMetricsMonitor(fireDepartments)); final RealtimeManager realtimeManager = new RealtimeManager(fireDepartments, conglomerate); lifecycle.addManagedInstance(realtimeManager); - final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); - final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); - final MonitorScheduler monitorScheduler = new MonitorScheduler( - configFactory.build(MonitorSchedulerConfig.class), - globalScheduledExec, - emitter, - monitors - ); - lifecycle.addManagedInstance(monitorScheduler); + startMonitoring(monitors); - final RequestLogger requestLogger = Initialization.makeRequestLogger(globalScheduledExec, props); - lifecycle.addManagedInstance(requestLogger); - - final Context v2Druid = new Context(server, "/druid/v2", Context.SESSIONS); + final Context v2Druid = new Context(getServer(), "/druid/v2", Context.SESSIONS); v2Druid.addServlet(new ServletHolder(new StatusServlet()), "/status"); v2Druid.addServlet( - new ServletHolder(new QueryServlet(jsonMapper, smileMapper, realtimeManager, emitter, requestLogger)), + new ServletHolder( + new QueryServlet(getJsonMapper(), getSmileMapper(), realtimeManager, emitter, getRequestLogger()) + ), "/*" ); @@ -258,47 +193,16 @@ public class RealtimeNode init(); } - lifecycle.start(); + getLifecycle().start(); } @LifecycleStop public synchronized void stop() { - lifecycle.stop(); + getLifecycle().stop(); } - private void initializeServer() - { - if (server == null) { - server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class)); - - lifecycle.addHandler( - new Lifecycle.Handler() - { - @Override - public void start() throws Exception - { - log.info("Starting Jetty"); - server.start(); - } - - @Override - public void stop() - { - log.info("Stopping Jetty"); - try { - server.stop(); - } - catch (Exception e) { - log.error(e, "Exception thrown while stopping Jetty"); - } - } - } - ); - } - } - - private void initializeJacksonInjectables() + protected void initializeJacksonInjectables() { final Map injectables = Maps.newHashMap(); @@ -306,13 +210,13 @@ public class RealtimeNode injectables.put(entry.getKey(), entry.getValue()); } - injectables.put("queryRunnerFactoryConglomerate", conglomerate); + injectables.put("queryRunnerFactoryConglomerate", getConglomerate()); injectables.put("segmentPusher", segmentPusher); injectables.put("metadataUpdater", metadataUpdater); injectables.put("serverView", view); - injectables.put("serviceEmitter", emitter); + injectables.put("serviceEmitter", getEmitter()); - jsonMapper.setInjectableValues( + getJsonMapper().setInjectableValues( new InjectableValues() { @Override @@ -326,96 +230,70 @@ public class RealtimeNode ); } - private void initializeMonitors() - { - if (monitors == null) { - monitors = Lists.newArrayList(); - monitors.add(new JvmMonitor()); - monitors.add(new SysMonitor()); - } - } - - private void initializeFireDepartments() throws IOException + private void initializeFireDepartments() { if (fireDepartments == null) { - fireDepartments = jsonMapper.readValue( - new File(PropUtils.getProperty(props, "druid.realtime.specFile")), - new TypeReference>(){} - ); + try { + fireDepartments = getJsonMapper().readValue( + new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")), + new TypeReference>(){} + ); + } + catch (IOException e) { + throw Throwables.propagate(e); + } } } - private void initializeSegmentPusher() throws S3ServiceException + private void initializeSegmentPusher() { if (segmentPusher == null) { - final RestS3Service s3Client = new RestS3Service( - new AWSCredentials( - PropUtils.getProperty(props, "com.metamx.aws.accessKey"), - PropUtils.getProperty(props, "com.metamx.aws.secretKey") - ) - ); + final Properties props = getProps(); + final RestS3Service s3Client; + try { + s3Client = new RestS3Service( + new AWSCredentials( + PropUtils.getProperty(props, "com.metamx.aws.accessKey"), + PropUtils.getProperty(props, "com.metamx.aws.secretKey") + ) + ); + } + catch (S3ServiceException e) { + throw Throwables.propagate(e); + } - segmentPusher = new S3SegmentPusher(s3Client, configFactory.build(S3SegmentPusherConfig.class), jsonMapper); + segmentPusher = new S3SegmentPusher(s3Client, getConfigFactory().build(S3SegmentPusherConfig.class), getJsonMapper()); } } - private void initializeQueryRunnerFactoryConglomerate() - { - if (conglomerate == null) { - StupidPool computationBufferPool = ServerInit.makeComputeScratchPool( - PropUtils.getPropertyAsInt(props, "druid.computation.buffer.size", 1024 * 1024 * 1024) - ); - conglomerate = new DefaultQueryRunnerFactoryConglomerate( - ServerInit.initDefaultQueryTypes(configFactory, computationBufferPool) - ); - } - } - - private void initializeMetadataUpdater() + protected void initializeMetadataUpdater() { if (metadataUpdater == null) { metadataUpdater = new MetadataUpdater( - jsonMapper, - configFactory.build(MetadataUpdaterConfig.class), - phoneBook, - new DbConnector(configFactory.build(DbConnectorConfig.class)).getDBI() + getJsonMapper(), + getConfigFactory().build(MetadataUpdaterConfig.class), + getPhoneBook(), + new DbConnector(getConfigFactory().build(DbConnectorConfig.class)).getDBI() ); - lifecycle.addManagedInstance(metadataUpdater); + getLifecycle().addManagedInstance(metadataUpdater); } } private void initializeView() { if (view == null) { - final ClientConfig clientConfig = configFactory.build(ClientConfig.class); final MutableServerView view = new OnlyNewSegmentWatcherServerView(); final ClientInventoryManager clientInventoryManager = new ClientInventoryManager( - clientConfig.getClientInventoryManagerConfig(), - phoneBook, + getConfigFactory().build(ClientConfig.class), + getPhoneBook(), view ); - lifecycle.addManagedInstance(clientInventoryManager); + getLifecycle().addManagedInstance(clientInventoryManager); this.view = view; } } - private void initializeEmitter() - { - if (emitter == null) { - final HttpClient httpClient = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).build(), lifecycle - ); - - emitter = new ServiceEmitter( - PropUtils.getProperty(props, "druid.service"), - PropUtils.getProperty(props, "druid.host"), - Emitters.create(props, httpClient, jsonMapper, lifecycle) - ); - } - EmittingLogger.registerEmitter(emitter); - } - public static class Builder { private ObjectMapper jsonMapper = null; @@ -466,7 +344,7 @@ public class RealtimeNode configFactory = Config.createFactory(props); } - return new RealtimeNode(jsonMapper, smileMapper, lifecycle, props, configFactory); + return new RealtimeNode(props, lifecycle, jsonMapper, smileMapper, configFactory); } } } \ No newline at end of file diff --git a/server/src/main/java/com/metamx/druid/BaseServerNode.java b/server/src/main/java/com/metamx/druid/BaseServerNode.java new file mode 100644 index 00000000000..fcb8a1ab46b --- /dev/null +++ b/server/src/main/java/com/metamx/druid/BaseServerNode.java @@ -0,0 +1,124 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.druid.collect.StupidPool; +import com.metamx.druid.initialization.ServerInit; +import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate; +import com.metamx.druid.query.QueryRunnerFactory; +import com.metamx.druid.query.QueryRunnerFactoryConglomerate; +import com.metamx.druid.utils.PropUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.skife.config.ConfigurationObjectFactory; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Properties; + +/** + */ +public abstract class BaseServerNode extends BaseNode +{ + private final Map, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap(); + private QueryRunnerFactoryConglomerate conglomerate = null; + private StupidPool computeScratchPool = null; + + public BaseServerNode( + Logger log, + Properties props, + Lifecycle lifecycle, + ObjectMapper jsonMapper, + ObjectMapper smileMapper, + ConfigurationObjectFactory configFactory + ) + { + super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + } + + public QueryRunnerFactoryConglomerate getConglomerate() + { + initializeQueryRunnerFactoryConglomerate(); + return conglomerate; + } + + public StupidPool getComputeScratchPool() + { + initializeComputeScratchPool(); + return computeScratchPool; + } + + @SuppressWarnings("unchecked") + public T setConglomerate(QueryRunnerFactoryConglomerate conglomerate) + { + checkFieldNotSetAndSet("conglomerate", conglomerate); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T setComputeScratchPool(StupidPool computeScratchPool) + { + checkFieldNotSetAndSet("computeScratchPool", computeScratchPool); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T registerQueryRunnerFactory(Class queryClazz, QueryRunnerFactory factory) + { + Preconditions.checkState( + conglomerate == null, + "Registering a QueryRunnerFactory only works when a separate conglomerate is not specified." + ); + Preconditions.checkState( + !additionalFactories.containsKey(queryClazz), "Registered factory for class[%s] multiple times", queryClazz + ); + additionalFactories.put(queryClazz, factory); + return (T) this; + } + + private void initializeComputeScratchPool() + { + if (computeScratchPool == null) { + setComputeScratchPool( + ServerInit.makeComputeScratchPool( + PropUtils.getPropertyAsInt(getProps(), "druid.computation.buffer.size", 1024 * 1024 * 1024) + ) + ); + } + } + + private void initializeQueryRunnerFactoryConglomerate() + { + if (conglomerate == null) { + final Map, QueryRunnerFactory> factories = ServerInit.initDefaultQueryTypes( + getConfigFactory(), getComputeScratchPool() + ); + + for (Map.Entry, QueryRunnerFactory> entry : additionalFactories.entrySet()) { + factories.put(entry.getKey(), entry.getValue()); + } + + setConglomerate(new DefaultQueryRunnerFactoryConglomerate(factories)); + } + } +} diff --git a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java index 935360834ce..04ce6f0c020 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java +++ b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java @@ -30,6 +30,7 @@ import org.joda.time.Interval; import com.google.common.base.Function; import com.google.common.collect.Ordering; +import com.metamx.common.ISE; import com.metamx.common.guava.FunctionalIterable; import com.metamx.druid.Query; import com.metamx.druid.StorageAdapter; @@ -94,12 +95,16 @@ public class ServerManager implements QuerySegmentWalker public Map getDataSourceSizes() { - return dataSourceSizes.snapshot(); + synchronized (dataSourceSizes) { + return dataSourceSizes.snapshot(); + } } public Map getDataSourceCounts() { - return dataSourceCounts.snapshot(); + synchronized (dataSourceCounts) { + return dataSourceCounts.snapshot(); + } } public void loadSegment(final DataSegment segment) throws StorageAdapterLoadingException @@ -109,7 +114,12 @@ public class ServerManager implements QuerySegmentWalker adapter = storageAdapterLoader.getAdapter(segment.getLoadSpec()); } catch (StorageAdapterLoadingException e) { - storageAdapterLoader.cleanupAdapter(segment.getLoadSpec()); + try { + storageAdapterLoader.cleanupAdapter(segment.getLoadSpec()); + } + catch (StorageAdapterLoadingException e1) { + // ignore + } throw e; } @@ -140,8 +150,12 @@ public class ServerManager implements QuerySegmentWalker loadedIntervals.add( segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(adapter) ); - dataSourceSizes.add(dataSource, segment.getSize()); - dataSourceCounts.add(dataSource, 1L); + synchronized (dataSourceSizes) { + dataSourceSizes.add(dataSource, segment.getSize()); + } + synchronized (dataSourceCounts) { + dataSourceCounts.add(dataSource, 1L); + } } } @@ -162,8 +176,12 @@ public class ServerManager implements QuerySegmentWalker StorageAdapter oldQueryable = (removed == null) ? null : removed.getObject(); if (oldQueryable != null) { - dataSourceSizes.add(dataSource, -segment.getSize()); - dataSourceCounts.add(dataSource, -1L); + synchronized (dataSourceSizes) { + dataSourceSizes.add(dataSource, -segment.getSize()); + } + synchronized (dataSourceCounts) { + dataSourceCounts.add(dataSource, -1L); + } } else { log.info( "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.", @@ -181,10 +199,7 @@ public class ServerManager implements QuerySegmentWalker { final QueryRunnerFactory> factory = conglomerate.findFactory(query); if (factory == null) { - log.makeAlert("Unknown query type, [%s]", query.getClass()) - .addData("dataSource", query.getDataSource()) - .emit(); - return new NoopQueryRunner(); + throw new ISE("Unknown query type[%s].", query.getClass()); } final QueryToolChest> toolChest = factory.getToolchest(); diff --git a/server/src/main/java/com/metamx/druid/coordination/legacy/S3SizeLookup.java b/server/src/main/java/com/metamx/druid/coordination/legacy/S3SizeLookup.java deleted file mode 100644 index 248b4e6863e..00000000000 --- a/server/src/main/java/com/metamx/druid/coordination/legacy/S3SizeLookup.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.coordination.legacy; - -import java.util.Map; - -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Bucket; -import org.jets3t.service.model.S3Object; - -import com.metamx.common.MapUtils; -import com.metamx.common.logger.Logger; - -/** - */ -public class S3SizeLookup implements SizeLookup -{ - private static final Logger log = new Logger(S3SizeLookup.class); - - private final RestS3Service s3Client; - - public S3SizeLookup( - RestS3Service s3Client - ) - { - this.s3Client = s3Client; - } - - @Override - public Long lookupSize(Map loadSpec) - { - String s3Bucket = MapUtils.getString(loadSpec, "bucket"); - String s3Path = MapUtils.getString(loadSpec, "key"); - - S3Object s3Obj = null; - try { - s3Obj = s3Client.getObjectDetails(new S3Bucket(s3Bucket), s3Path); - } - catch (S3ServiceException e) { - log.warn(e, "Exception when trying to lookup size for s3://%s/%s", s3Bucket, s3Path); - return null; - } - - if (s3Obj == null) { - log.warn("s3Object for s3://%s/%s was null.", s3Bucket, s3Path); - return null; - } - - return s3Obj.getContentLength(); - } -} diff --git a/server/src/main/java/com/metamx/druid/coordination/legacy/TheSizeAdjuster.java b/server/src/main/java/com/metamx/druid/coordination/legacy/TheSizeAdjuster.java deleted file mode 100644 index 07f621f2db1..00000000000 --- a/server/src/main/java/com/metamx/druid/coordination/legacy/TheSizeAdjuster.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.coordination.legacy; - -import java.io.IOException; -import java.util.Map; - -import org.I0Itec.zkclient.ZkClient; -import org.codehaus.jackson.map.ObjectMapper; - -import com.google.common.base.Joiner; -import com.metamx.common.MapUtils; -import com.metamx.common.logger.Logger; -import com.metamx.druid.client.DataSegment; - -/** - */ -public class TheSizeAdjuster -{ - private static final Logger log = new Logger(TheSizeAdjuster.class); - private static final Joiner JOINER = Joiner.on("/"); - - private final TheSizeAdjusterConfig config; - private final ObjectMapper jsonMapper; - private final Map lookups; - private final ZkClient zkClient; - - public TheSizeAdjuster( - TheSizeAdjusterConfig config, - ObjectMapper jsonMapper, - Map lookups, - ZkClient zkClient - ) - { - this.config = config; - this.jsonMapper = jsonMapper; - this.lookups = lookups; - this.zkClient = zkClient; - } - - public Long lookupSize(Map descriptor) - { - String type = MapUtils.getString(descriptor, "type"); - SizeLookup adjuster = lookups.get(type); - - if (adjuster == null) { - log.warn("Unknown type[%s] for SizeAdjuster, known types are %s", type, lookups.keySet()); - return null; - } - - return adjuster.lookupSize(descriptor); - } - - public DataSegment updateDescriptor(DataSegment dataSegment) - { - Long size = lookupSize(dataSegment.getLoadSpec()); - - if (size == null || size < 0) { - log.warn("Unable to determine size[%s] of segment[%s], ignoring.", size, dataSegment); - return null; - } - - final DataSegment segment = new DataSegment( - dataSegment.getDataSource(), - dataSegment.getInterval(), - dataSegment.getVersion() + "_w_size", - dataSegment.getLoadSpec(), - dataSegment.getDimensions(), - dataSegment.getMetrics(), - dataSegment.getShardSpec(), - size - ); - - String oldSegmentPath = JOINER.join(config.getSegmentBasePath(), dataSegment.getDataSource(), dataSegment.getIdentifier()); - String newSegmentPath = JOINER.join(config.getSegmentBasePath(), segment.getDataSource(), segment.getIdentifier()); - try { - String data = jsonMapper.writeValueAsString(segment); - zkClient.createPersistent(newSegmentPath, data); - log.info("Created new segment node[%s] with content[%s]", newSegmentPath, data); - zkClient.delete(oldSegmentPath); - log.info("Deleted old segment node[%s]", oldSegmentPath); - } - catch (IOException e) { - log.warn(e, "Exception thrown on segment[%s]", segment); - return null; - } - - return segment; - } -} diff --git a/server/src/main/java/com/metamx/druid/coordination/legacy/SizeLookup.java b/server/src/main/java/com/metamx/druid/http/ComputeMain.java similarity index 55% rename from server/src/main/java/com/metamx/druid/coordination/legacy/SizeLookup.java rename to server/src/main/java/com/metamx/druid/http/ComputeMain.java index 4015165c547..828d1568214 100644 --- a/server/src/main/java/com/metamx/druid/coordination/legacy/SizeLookup.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeMain.java @@ -17,13 +17,36 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.coordination.legacy; +package com.metamx.druid.http; -import java.util.Map; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.druid.log.LogLevelAdjuster; /** */ -public interface SizeLookup +public class ComputeMain { - public Long lookupSize(Map descriptor); + private static final Logger log = new Logger(ComputeMain.class); + + public static void main(String[] args) throws Exception + { + LogLevelAdjuster.register(); + + Lifecycle lifecycle = new Lifecycle(); + + lifecycle.addManagedInstance( + ComputeNode.builder().build() + ); + + try { + lifecycle.start(); + } + catch (Throwable t) { + log.info(t, "Throwable caught at startup, committing seppuku"); + System.exit(2); + } + + lifecycle.join(); + } } diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java new file mode 100644 index 00000000000..1369b0898d2 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -0,0 +1,236 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.http; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.metamx.common.concurrent.ExecutorServices; +import com.metamx.common.config.Config; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.druid.BaseServerNode; +import com.metamx.druid.client.DruidServer; +import com.metamx.druid.client.DruidServerConfig; +import com.metamx.druid.coordination.ServerManager; +import com.metamx.druid.coordination.ZkCoordinator; +import com.metamx.druid.coordination.ZkCoordinatorConfig; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.ServerInit; +import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.loading.QueryableLoaderConfig; +import com.metamx.druid.loading.StorageAdapterLoader; +import com.metamx.druid.metrics.ServerMonitor; +import com.metamx.druid.query.QueryRunnerFactoryConglomerate; +import com.metamx.druid.utils.PropUtils; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.metrics.Monitor; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.smile.SmileFactory; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.security.AWSCredentials; +import org.mortbay.jetty.servlet.Context; +import org.mortbay.jetty.servlet.ServletHolder; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; + +/** + */ +public class ComputeNode extends BaseServerNode +{ + private static final Logger log = new Logger(ComputeNode.class); + + public static Builder builder() + { + return new Builder(); + } + + private DruidServer druidServer; + private StorageAdapterLoader adapterLoader; + + public ComputeNode( + Properties props, + Lifecycle lifecycle, + ObjectMapper jsonMapper, + ObjectMapper smileMapper, + ConfigurationObjectFactory configFactory + ) + { + super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + } + + public ComputeNode setAdapterLoader(StorageAdapterLoader storageAdapterLoader) + { + Preconditions.checkState(this.adapterLoader == null, "Cannot set adapterLoader once it has already been set."); + this.adapterLoader = storageAdapterLoader; + return this; + } + + public ComputeNode setDruidServer(DruidServer druidServer) + { + Preconditions.checkState(this.druidServer == null, "Cannot set druidServer once it has already been set."); + this.druidServer = druidServer; + return this; + } + + public DruidServer getDruidServer() + { + initializeDruidServer(); + return druidServer; + } + + public StorageAdapterLoader getAdapterLoader() + { + initializeAdapterLoader(); + return adapterLoader; + } + + protected void doInit() throws Exception + { + initializeDruidServer(); + initializeAdapterLoader(); + + final Lifecycle lifecycle = getLifecycle(); + final ServiceEmitter emitter = getEmitter(); + final List monitors = getMonitors(); + final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); + + final ExecutorService executorService = ExecutorServices.create( + getLifecycle(), + getConfigFactory().buildWithReplacements( + ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") + ) + ); + ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService); + + final ZkCoordinator coordinator = new ZkCoordinator( + getJsonMapper(), + getConfigFactory().build(ZkCoordinatorConfig.class), + druidServer, + getPhoneBook(), + serverManager, + emitter + ); + lifecycle.addManagedInstance(coordinator); + + monitors.add(new ServerMonitor(getDruidServer(), serverManager)); + startMonitoring(monitors); + + final Context root = new Context(getServer(), "/", Context.SESSIONS); + + root.addServlet(new ServletHolder(new StatusServlet()), "/status"); + root.addServlet( + new ServletHolder( + new QueryServlet(getJsonMapper(), getSmileMapper(), serverManager, emitter, getRequestLogger()) + ), + "/*" + ); + } + + private void initializeAdapterLoader() + { + if (adapterLoader == null) { + final Properties props = getProps(); + try { + final RestS3Service s3Client = new RestS3Service( + new AWSCredentials( + PropUtils.getProperty(props, "com.metamx.aws.accessKey"), + PropUtils.getProperty(props, "com.metamx.aws.secretKey") + ) + ); + + setAdapterLoader( + ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(QueryableLoaderConfig.class)) + ); + } + catch (S3ServiceException e) { + throw Throwables.propagate(e); + } + } + } + + private void initializeDruidServer() + { + if (druidServer == null) { + setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class))); + } + } + + public static class Builder + { + private ObjectMapper jsonMapper = null; + private ObjectMapper smileMapper = null; + private Lifecycle lifecycle = null; + private Properties props = null; + private ConfigurationObjectFactory configFactory = null; + + public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper) + { + this.jsonMapper = jsonMapper; + this.smileMapper = smileMapper; + return this; + } + + public Builder withProps(Properties props) + { + this.props = props; + return this; + } + + public Builder withConfigFactory(ConfigurationObjectFactory configFactory) + { + this.configFactory = configFactory; + return this; + } + + public ComputeNode build() + { + if (jsonMapper == null && smileMapper == null) { + jsonMapper = new DefaultObjectMapper(); + smileMapper = new DefaultObjectMapper(new SmileFactory()); + smileMapper.getJsonFactory().setCodec(smileMapper); + } + else if (jsonMapper == null || smileMapper == null) { + throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper); + } + + if (lifecycle == null) { + lifecycle = new Lifecycle(); + } + + if (props == null) { + props = Initialization.loadProperties(); + } + + if (configFactory == null) { + configFactory = Config.createFactory(props); + } + + return new ComputeNode(props, lifecycle, jsonMapper, smileMapper, configFactory); + } + } + +} diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index edac3707833..1029d6bd0c5 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -39,7 +39,6 @@ import org.skife.jdbi.v2.DBI; import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.servlet.GuiceFilter; @@ -54,10 +53,6 @@ import com.metamx.druid.client.ServerInventoryManager; import com.metamx.druid.client.ServerInventoryManagerConfig; import com.metamx.druid.coordination.DruidClusterInfo; import com.metamx.druid.coordination.DruidClusterInfoConfig; -import com.metamx.druid.coordination.legacy.S3SizeLookup; -import com.metamx.druid.coordination.legacy.SizeLookup; -import com.metamx.druid.coordination.legacy.TheSizeAdjuster; -import com.metamx.druid.coordination.legacy.TheSizeAdjusterConfig; import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DbConnector; @@ -71,6 +66,7 @@ import com.metamx.druid.log.LogLevelAdjuster; import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMasterConfig; import com.metamx.druid.master.LoadQueuePeon; +import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.core.Emitters; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; @@ -86,6 +82,20 @@ import com.metamx.phonebook.PhoneBook; import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.x.discovery.ServiceDiscovery; import com.netflix.curator.x.discovery.ServiceProvider; +import org.I0Itec.zkclient.ZkClient; +import org.codehaus.jackson.map.ObjectMapper; +import org.mortbay.jetty.Server; +import org.mortbay.jetty.servlet.Context; +import org.mortbay.jetty.servlet.DefaultServlet; +import org.mortbay.jetty.servlet.FilterHolder; +import org.mortbay.jetty.servlet.ServletHolder; +import org.skife.config.ConfigurationObjectFactory; +import org.skife.jdbi.v2.DBI; + +import java.net.URL; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; /** */ @@ -107,21 +117,14 @@ public class MasterMain ); final ServiceEmitter emitter = new ServiceEmitter( - props.getProperty("druid.service"), - props.getProperty("druid.host"), + PropUtils.getProperty(props, "druid.service"), + PropUtils.getProperty(props, "druid.host"), Emitters.create(props, httpClient, jsonMapper, lifecycle) ); - final RestS3Service s3Client = new RestS3Service( - new AWSCredentials( - props.getProperty("com.metamx.aws.accessKey"), - props.getProperty("com.metamx.aws.secretKey") - ) - ); - final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle); - final PhoneBook masterYp = Initialization.createYellowPages(jsonMapper, zkClient, "Master-ZKYP--%s", lifecycle); + final PhoneBook masterYp = Initialization.createPhoneBook(jsonMapper, zkClient, "Master-ZKYP--%s", lifecycle); final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); final SegmentInventoryManager segmentInventoryManager = @@ -184,14 +187,6 @@ public class MasterMain jsonMapper, databaseSegmentManager, serverInventoryManager, - new TheSizeAdjuster( - configFactory.build(TheSizeAdjusterConfig.class), - jsonMapper, - ImmutableMap.of( - "s3", new S3SizeLookup(s3Client) - ), - zkClient - ), masterYp, emitter, scheduledExecutorFactory, diff --git a/server/src/main/java/com/metamx/druid/http/ServerMain.java b/server/src/main/java/com/metamx/druid/http/ServerMain.java index ed82c45e600..11dcb879584 100644 --- a/server/src/main/java/com/metamx/druid/http/ServerMain.java +++ b/server/src/main/java/com/metamx/druid/http/ServerMain.java @@ -19,196 +19,26 @@ package com.metamx.druid.http; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; - -import org.I0Itec.zkclient.ZkClient; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.smile.SmileFactory; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; -import org.mortbay.jetty.Server; -import org.mortbay.jetty.servlet.Context; -import org.mortbay.jetty.servlet.ServletHolder; -import org.skife.config.ConfigurationObjectFactory; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.metamx.common.concurrent.ExecutorServiceConfig; -import com.metamx.common.concurrent.ExecutorServices; -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.config.Config; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.logger.Logger; -import com.metamx.druid.Query; -import com.metamx.druid.client.DruidServer; -import com.metamx.druid.client.DruidServerConfig; -import com.metamx.druid.collect.StupidPool; -import com.metamx.druid.coordination.ServerManager; -import com.metamx.druid.coordination.ZkCoordinator; -import com.metamx.druid.coordination.ZkCoordinatorConfig; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServerInit; -import com.metamx.druid.initialization.ZkClientConfig; -import com.metamx.druid.jackson.DefaultObjectMapper; -import com.metamx.druid.loading.QueryableLoaderConfig; -import com.metamx.druid.log.LogLevelAdjuster; -import com.metamx.druid.metrics.ServerMonitor; -import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate; -import com.metamx.druid.query.QueryRunnerFactory; -import com.metamx.emitter.core.Emitters; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; -import com.metamx.metrics.JvmMonitor; -import com.metamx.metrics.Monitor; -import com.metamx.metrics.MonitorScheduler; -import com.metamx.metrics.MonitorSchedulerConfig; -import com.metamx.metrics.SysMonitor; -import com.metamx.phonebook.PhoneBook; - /** */ +@Deprecated public class ServerMain { - private static final Logger log = new Logger(ServerMain.class); - public static void main(String[] args) throws Exception { - LogLevelAdjuster.register(); - - final ObjectMapper jsonMapper = new DefaultObjectMapper(); - final ObjectMapper smileMapper = new DefaultObjectMapper(new SmileFactory()); - smileMapper.getJsonFactory().setCodec(smileMapper); - - final Properties props = Initialization.loadProperties(); - final ConfigurationObjectFactory configFactory = Config.createFactory(props); - final Lifecycle lifecycle = new Lifecycle(); - - final HttpClient httpClient = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).build(), lifecycle - ); - - final ServiceEmitter emitter = new ServiceEmitter( - props.getProperty("druid.service"), - props.getProperty("druid.host"), - Emitters.create(props, httpClient, jsonMapper, lifecycle) - ); - - final ExecutorService executorService = ExecutorServices.create( - lifecycle, - configFactory.buildWithReplacements( - ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") - ) - ); - - StupidPool computationBufferPool = ServerInit.makeComputeScratchPool( - Integer.parseInt(props.getProperty("druid.computation.buffer.size", String.valueOf(1024 * 1024 * 1024))) - ); - - Map, QueryRunnerFactory> queryRunners = ServerInit.initDefaultQueryTypes( - configFactory, - computationBufferPool - ); - - final RestS3Service s3Client = new RestS3Service( - new AWSCredentials(props.getProperty("com.metamx.aws.accessKey"), props.getProperty("com.metamx.aws.secretKey")) - ); - QueryableLoaderConfig queryableLoaderConfig = configFactory.build(QueryableLoaderConfig.class); - final ServerManager serverManager = new ServerManager( - ServerInit.makeDefaultQueryableLoader(s3Client, queryableLoaderConfig), - new DefaultQueryRunnerFactoryConglomerate(queryRunners), - emitter, - executorService - ); - - final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle); - - final DruidServer druidServer = new DruidServer(configFactory.build(DruidServerConfig.class)); - final PhoneBook coordinatorYp = Initialization.createYellowPages( - jsonMapper, - zkClient, - "Coordinator-ZKYP--%s", - lifecycle - ); - final ZkCoordinator coordinator = new ZkCoordinator( - jsonMapper, - configFactory.build(ZkCoordinatorConfig.class), - druidServer, - coordinatorYp, - serverManager, - emitter - ); - lifecycle.addManagedInstance(coordinator); - - final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); - - final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); - final List monitors = Lists.newArrayList( - new ServerMonitor(druidServer, serverManager), - new JvmMonitor() - ); - if (Boolean.parseBoolean(props.getProperty("druid.monitoring.monitorSystem", "true"))) { - monitors.add(new SysMonitor()); - } - - final MonitorScheduler healthMonitor = new MonitorScheduler( - configFactory.build(MonitorSchedulerConfig.class), - globalScheduledExec, - emitter, - monitors - ); - lifecycle.addManagedInstance(healthMonitor); - - final RequestLogger requestLogger = Initialization.makeRequestLogger( - scheduledExecutorFactory.create( - 1, - "RequestLogger--%d" - ), - props - ); - lifecycle.addManagedInstance(requestLogger); - - try { - lifecycle.start(); - } - catch (Throwable t) { - log.error(t, "Error when starting up. Failing."); - System.exit(1); - } - - Runtime.getRuntime().addShutdownHook( - new Thread( - new Runnable() - { - @Override - public void run() - { - log.info("Running shutdown hook"); - lifecycle.stop(); - } - } - ) - ); - - final Server server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class)); - final Context root = new Context(server, "/", Context.SESSIONS); - - root.addServlet(new ServletHolder(new StatusServlet()), "/status"); - root.addServlet( - new ServletHolder(new QueryServlet(jsonMapper, smileMapper, serverManager, emitter, requestLogger)), - "/*" - ); - - - server.start(); - server.join(); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead."); + System.out.println("K thx bye."); + ComputeMain.main(args); } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 0a90e7f99da..acbe1fc1549 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -53,7 +53,6 @@ import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.SegmentInventoryManager; import com.metamx.druid.client.ServerInventoryManager; import com.metamx.druid.coordination.DruidClusterInfo; -import com.metamx.druid.coordination.legacy.TheSizeAdjuster; import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -78,7 +77,6 @@ public class DruidMaster private final DruidClusterInfo clusterInfo; private final DatabaseSegmentManager databaseSegmentManager; private final ServerInventoryManager serverInventoryManager; - private final TheSizeAdjuster sizeAdjuster; private final PhoneBook yp; private final ServiceEmitter emitter; private final ScheduledExecutorService exec; @@ -98,7 +96,6 @@ public class DruidMaster ObjectMapper jsonMapper, DatabaseSegmentManager databaseSegmentManager, ServerInventoryManager serverInventoryManager, - TheSizeAdjuster sizeAdjuster, PhoneBook zkPhoneBook, ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, @@ -114,7 +111,6 @@ public class DruidMaster this.databaseSegmentManager = databaseSegmentManager; this.serverInventoryManager = serverInventoryManager; - this.sizeAdjuster = sizeAdjuster; this.yp = zkPhoneBook; this.emitter = emitter; @@ -355,16 +351,7 @@ public class DruidMaster for (DataSegment dataSegment : dataSegments) { if (dataSegment.getSize() < 0) { - log.info("No size on Segment[%s], setting.", dataSegment); - - DataSegment newDataSegment = sizeAdjuster.updateDescriptor(dataSegment); - - if (newDataSegment == null) { - log.warn("newDataSegment was null with old dataSegment[%s]. Skipping.", dataSegment); - continue; - } - - dataSegment = newDataSegment; + log.warn("No size on Segment[%s], wtf?", dataSegment); } availableSegments.add(dataSegment); } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index ebbe1f0f733..ae3755e79c8 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -33,7 +33,6 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.ServerInventoryManager; import com.metamx.druid.client.ZKPhoneBook; -import com.metamx.druid.coordination.legacy.TheSizeAdjuster; import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.metrics.NoopServiceEmitter; import com.metamx.phonebook.PhoneBook; @@ -48,7 +47,6 @@ public class DruidMasterTest private PhoneBook yp; private DatabaseSegmentManager databaseSegmentManager; private ServerInventoryManager serverInventoryManager; - private TheSizeAdjuster theSizeAdjuster; private ScheduledExecutorFactory scheduledExecutorFactory; private DruidServer druidServer; private DataSegment segment; @@ -70,9 +68,6 @@ public class DruidMasterTest databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class); EasyMock.replay(databaseSegmentManager); - theSizeAdjuster = EasyMock.createNiceMock(TheSizeAdjuster.class); - EasyMock.replay(theSizeAdjuster); - scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); EasyMock.replay(scheduledExecutorFactory); @@ -144,7 +139,6 @@ public class DruidMasterTest null, databaseSegmentManager, serverInventoryManager, - theSizeAdjuster, yp, new NoopServiceEmitter(), scheduledExecutorFactory,