mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
1) Refactor the BrokerMain into a BrokerMain and a BrokerNode to allow for extensions
2) Rename ServerMain to ComputeMain 3) Refactor ComputeMain to ComputeMain and ComputeNode to allow for extensions 4) Remove "TheSizeAdjuster", was old legacy stuff that's not relevant anymore 5) Fix bug with registering different IndexIO handlers 6) Adjust it so that when a query type is unknown, it returns an error message instead of just emitting an alert and returning nothing 7) Create super classes for the various *Node classes that allow for code sharing on pieces that are common
This commit is contained in:
parent
4928978870
commit
8dd6f5c059
413
client/src/main/java/com/metamx/druid/BaseNode.java
Normal file
413
client/src/main/java/com/metamx/druid/BaseNode.java
Normal file
@ -0,0 +1,413 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.http.RequestLogger;
|
||||
import com.metamx.druid.index.v1.serde.ComplexMetricRegistererer;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ZkClientConfig;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.Emitters;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.HttpClientConfig;
|
||||
import com.metamx.http.client.HttpClientInit;
|
||||
import com.metamx.metrics.JvmMonitor;
|
||||
import com.metamx.metrics.Monitor;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import com.metamx.metrics.MonitorSchedulerConfig;
|
||||
import com.metamx.metrics.SysMonitor;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.jsontype.NamedType;
|
||||
import org.codehaus.jackson.smile.SmileFactory;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class BaseNode<T extends BaseNode>
|
||||
{
|
||||
private final Logger log;
|
||||
|
||||
private final Lifecycle lifecycle;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ObjectMapper smileMapper;
|
||||
private final Properties props;
|
||||
private final ConfigurationObjectFactory configFactory;
|
||||
|
||||
private PhoneBook phoneBook = null;
|
||||
private ServiceEmitter emitter = null;
|
||||
private List<Monitor> monitors = null;
|
||||
private Server server = null;
|
||||
private ZkClient zkClient;
|
||||
private ScheduledExecutorFactory scheduledExecutorFactory;
|
||||
private RequestLogger requestLogger;
|
||||
|
||||
private boolean initialized = false;
|
||||
|
||||
public BaseNode(
|
||||
Logger log,
|
||||
Properties props,
|
||||
Lifecycle lifecycle,
|
||||
ObjectMapper jsonMapper,
|
||||
ObjectMapper smileMapper,
|
||||
ConfigurationObjectFactory configFactory
|
||||
)
|
||||
{
|
||||
this.log = log;
|
||||
this.configFactory = configFactory;
|
||||
this.props = props;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.lifecycle = lifecycle;
|
||||
this.smileMapper = smileMapper;
|
||||
|
||||
Preconditions.checkNotNull(props, "props");
|
||||
Preconditions.checkNotNull(lifecycle, "lifecycle");
|
||||
Preconditions.checkNotNull(jsonMapper, "jsonMapper");
|
||||
Preconditions.checkNotNull(smileMapper, "smileMapper");
|
||||
Preconditions.checkNotNull(configFactory, "configFactory");
|
||||
|
||||
Preconditions.checkState(smileMapper.getJsonFactory() instanceof SmileFactory, "smileMapper should use smile.");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T setZkClient(ZkClient zkClient)
|
||||
{
|
||||
checkFieldNotSetAndSet("zkClient", zkClient);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T setPhoneBook(PhoneBook phoneBook)
|
||||
{
|
||||
checkFieldNotSetAndSet("phoneBook", phoneBook);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T setEmitter(ServiceEmitter emitter)
|
||||
{
|
||||
checkFieldNotSetAndSet("emitter", emitter);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T setMonitors(List<Monitor> monitors)
|
||||
{
|
||||
checkFieldNotSetAndSet("monitors", monitors);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T setServer(Server server)
|
||||
{
|
||||
checkFieldNotSetAndSet("server", server);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T setScheduledExecutorFactory(ScheduledExecutorFactory factory)
|
||||
{
|
||||
checkFieldNotSetAndSet("scheduledExecutorFactory", factory);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T setRequestLogger(RequestLogger requestLogger)
|
||||
{
|
||||
checkFieldNotSetAndSet("requestLogger", requestLogger);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T registerJacksonSubtype(Class<?>... clazzes)
|
||||
{
|
||||
jsonMapper.registerSubtypes(clazzes);
|
||||
smileMapper.registerSubtypes(clazzes);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T registerJacksonSubtype(NamedType... namedTypes)
|
||||
{
|
||||
jsonMapper.registerSubtypes(namedTypes);
|
||||
smileMapper.registerSubtypes(namedTypes);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T registerComplexMetric(ComplexMetricRegistererer registererer)
|
||||
{
|
||||
registererer.register();
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
public Lifecycle getLifecycle()
|
||||
{
|
||||
return lifecycle;
|
||||
}
|
||||
|
||||
public ObjectMapper getJsonMapper()
|
||||
{
|
||||
return jsonMapper;
|
||||
}
|
||||
|
||||
public ObjectMapper getSmileMapper()
|
||||
{
|
||||
return smileMapper;
|
||||
}
|
||||
|
||||
public Properties getProps()
|
||||
{
|
||||
return props;
|
||||
}
|
||||
|
||||
public ConfigurationObjectFactory getConfigFactory()
|
||||
{
|
||||
return configFactory;
|
||||
}
|
||||
|
||||
public ZkClient getZkClient()
|
||||
{
|
||||
initializeZkClient();
|
||||
return zkClient;
|
||||
}
|
||||
|
||||
public PhoneBook getPhoneBook()
|
||||
{
|
||||
initializePhoneBook();
|
||||
return phoneBook;
|
||||
}
|
||||
|
||||
public ServiceEmitter getEmitter()
|
||||
{
|
||||
initializeEmitter();
|
||||
return emitter;
|
||||
}
|
||||
|
||||
public List<Monitor> getMonitors()
|
||||
{
|
||||
initializeMonitors();
|
||||
return monitors;
|
||||
}
|
||||
|
||||
public Server getServer()
|
||||
{
|
||||
initializeServer();
|
||||
return server;
|
||||
}
|
||||
|
||||
public ScheduledExecutorFactory getScheduledExecutorFactory()
|
||||
{
|
||||
initializeScheduledExecutorFactory();
|
||||
return scheduledExecutorFactory;
|
||||
}
|
||||
|
||||
public RequestLogger getRequestLogger()
|
||||
{
|
||||
initializeRequestLogger();
|
||||
return requestLogger;
|
||||
}
|
||||
|
||||
private void initializeRequestLogger()
|
||||
{
|
||||
if (requestLogger == null) {
|
||||
try {
|
||||
setRequestLogger(Initialization.makeRequestLogger(getScheduledExecutorFactory(), getProps()));
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
lifecycle.addManagedInstance(requestLogger);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeScheduledExecutorFactory()
|
||||
{
|
||||
if (scheduledExecutorFactory == null) {
|
||||
setScheduledExecutorFactory(ScheduledExecutors.createFactory(getLifecycle()));
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeZkClient()
|
||||
{
|
||||
if (zkClient == null) {
|
||||
setZkClient(Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle));
|
||||
}
|
||||
}
|
||||
|
||||
private void initializePhoneBook()
|
||||
{
|
||||
if (phoneBook == null) {
|
||||
setPhoneBook(
|
||||
Initialization.createPhoneBook(
|
||||
jsonMapper,
|
||||
getZkClient(),
|
||||
"PhoneBook--%s",
|
||||
lifecycle
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeServer()
|
||||
{
|
||||
if (server == null) {
|
||||
setServer(Initialization.makeJettyServer(configFactory.build(ServerConfig.class)));
|
||||
|
||||
lifecycle.addHandler(
|
||||
new Lifecycle.Handler()
|
||||
{
|
||||
@Override
|
||||
public void start() throws Exception
|
||||
{
|
||||
log.info("Starting Jetty");
|
||||
server.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
log.info("Stopping Jetty");
|
||||
try {
|
||||
server.stop();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception thrown while stopping Jetty");
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeMonitors()
|
||||
{
|
||||
if (monitors == null) {
|
||||
List<Monitor> theMonitors = Lists.newArrayList();
|
||||
theMonitors.add(new JvmMonitor());
|
||||
if (Boolean.parseBoolean(props.getProperty("druid.monitoring.monitorSystem", "false"))) {
|
||||
theMonitors.add(new SysMonitor());
|
||||
}
|
||||
|
||||
setMonitors(theMonitors);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeEmitter()
|
||||
{
|
||||
if (emitter == null) {
|
||||
final HttpClient httpClient = HttpClientInit.createClient(
|
||||
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
|
||||
);
|
||||
|
||||
setEmitter(
|
||||
new ServiceEmitter(
|
||||
PropUtils.getProperty(props, "druid.service"),
|
||||
PropUtils.getProperty(props, "druid.host"),
|
||||
Emitters.create(props, httpClient, jsonMapper, lifecycle)
|
||||
)
|
||||
);
|
||||
}
|
||||
EmittingLogger.registerEmitter(emitter);
|
||||
}
|
||||
|
||||
protected void init() throws Exception
|
||||
{
|
||||
doInit();
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
protected abstract void doInit() throws Exception;
|
||||
|
||||
@LifecycleStart
|
||||
public synchronized void start() throws Exception
|
||||
{
|
||||
if (! initialized) {
|
||||
init();
|
||||
}
|
||||
|
||||
lifecycle.start();
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public synchronized void stop()
|
||||
{
|
||||
lifecycle.stop();
|
||||
}
|
||||
|
||||
protected ScheduledExecutorService startMonitoring(List<Monitor> monitors)
|
||||
{
|
||||
final ScheduledExecutorService globalScheduledExec = getScheduledExecutorFactory().create(1, "Global--%d");
|
||||
final MonitorScheduler monitorScheduler = new MonitorScheduler(
|
||||
getConfigFactory().build(MonitorSchedulerConfig.class),
|
||||
globalScheduledExec,
|
||||
getEmitter(),
|
||||
monitors
|
||||
);
|
||||
getLifecycle().addManagedInstance(monitorScheduler);
|
||||
return globalScheduledExec;
|
||||
}
|
||||
|
||||
protected void checkFieldNotSetAndSet(String fieldName, Object value)
|
||||
{
|
||||
Class<?> theClazz = this.getClass();
|
||||
while (theClazz != null && theClazz != Object.class) {
|
||||
try {
|
||||
final Field field = theClazz.getDeclaredField(fieldName);
|
||||
field.setAccessible(true);
|
||||
Preconditions.checkState(field.get(this) == null, "Cannot set %s once it has already been set.", fieldName);
|
||||
|
||||
field.set(this, value);
|
||||
return;
|
||||
}
|
||||
catch (NoSuchFieldException e) {
|
||||
// Perhaps it is inherited?
|
||||
theClazz = theClazz.getSuperclass();
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
throw new ISE("Unknown field[%s] on class[%s]", fieldName, this.getClass());
|
||||
}
|
||||
}
|
@ -40,9 +40,9 @@ import com.metamx.http.client.HttpClient;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ClientSideServerView implements MutableServerView
|
||||
public class BrokerServerView implements MutableServerView
|
||||
{
|
||||
private static final Logger log = new Logger(ClientSideServerView.class);
|
||||
private static final Logger log = new Logger(BrokerServerView.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
@ -55,7 +55,7 @@ public class ClientSideServerView implements MutableServerView
|
||||
private final ObjectMapper smileMapper;
|
||||
private final HttpClient httpClient;
|
||||
|
||||
public ClientSideServerView(
|
||||
public BrokerServerView(
|
||||
QueryToolChestWarehouse warehose,
|
||||
ObjectMapper smileMapper,
|
||||
HttpClient httpClient
|
@ -23,19 +23,18 @@ import org.skife.config.Config;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class ClientConfig
|
||||
public abstract class ClientConfig extends InventoryManagerConfig
|
||||
{
|
||||
@Config("druid.zk.paths.announcementsPath")
|
||||
public abstract String getAnnouncementsPath();
|
||||
|
||||
@Config("druid.zk.paths.servedSegmentsPath")
|
||||
public abstract String getServedSegmentsPath();
|
||||
|
||||
public InventoryManagerConfig getClientInventoryManagerConfig()
|
||||
public ClientConfig()
|
||||
{
|
||||
return new InventoryManagerConfig(
|
||||
getAnnouncementsPath(),
|
||||
getServedSegmentsPath()
|
||||
);
|
||||
super(null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Config("druid.zk.paths.announcementsPath")
|
||||
public abstract String getInventoryIdPath();
|
||||
|
||||
@Override
|
||||
@Config("druid.zk.paths.servedSegmentsPath")
|
||||
public abstract String getInventoryPath();
|
||||
}
|
||||
|
@ -19,56 +19,9 @@
|
||||
|
||||
package com.metamx.druid.http;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.smile.SmileFactory;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.CachingClusteredClient;
|
||||
import com.metamx.druid.client.ClientConfig;
|
||||
import com.metamx.druid.client.ClientInventoryManager;
|
||||
import com.metamx.druid.client.ClientSideServerView;
|
||||
import com.metamx.druid.client.cache.CacheBroker;
|
||||
import com.metamx.druid.client.cache.CacheMonitor;
|
||||
import com.metamx.druid.client.cache.MapCacheBroker;
|
||||
import com.metamx.druid.client.cache.MapCacheBrokerConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.initialization.ZkClientConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.log.LogLevelAdjuster;
|
||||
import com.metamx.druid.query.QueryToolChestWarehouse;
|
||||
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
|
||||
import com.metamx.emitter.core.Emitters;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.HttpClientConfig;
|
||||
import com.metamx.http.client.HttpClientInit;
|
||||
import com.metamx.metrics.JvmMonitor;
|
||||
import com.metamx.metrics.Monitor;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import com.metamx.metrics.MonitorSchedulerConfig;
|
||||
import com.metamx.metrics.SysMonitor;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.x.discovery.ServiceDiscovery;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ -81,125 +34,20 @@ public class BrokerMain
|
||||
{
|
||||
LogLevelAdjuster.register();
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final ObjectMapper smileMapper = new DefaultObjectMapper(new SmileFactory());
|
||||
smileMapper.getJsonFactory().setCodec(smileMapper);
|
||||
Lifecycle lifecycle = new Lifecycle();
|
||||
|
||||
final Properties props = Initialization.loadProperties();
|
||||
final Lifecycle lifecycle = new Lifecycle();
|
||||
final ConfigurationObjectFactory configFactory = Config.createFactory(props);
|
||||
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
|
||||
final PhoneBook phoneBook = Initialization.createYellowPages(
|
||||
jsonMapper, zkClient, "Client-ZKYP--%s", lifecycle
|
||||
lifecycle.addManagedInstance(
|
||||
BrokerNode.builder().build()
|
||||
);
|
||||
|
||||
final HttpClient httpClient = HttpClientInit.createClient(
|
||||
HttpClientConfig.builder()
|
||||
.withNumConnections(
|
||||
Integer.parseInt(props.getProperty("druid.client.http.connections"))
|
||||
)
|
||||
.build(),
|
||||
lifecycle
|
||||
);
|
||||
final HttpClient emitterHttpClient = HttpClientInit.createClient(
|
||||
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
|
||||
);
|
||||
final ServiceEmitter emitter = new ServiceEmitter(
|
||||
props.getProperty("druid.service"),
|
||||
props.getProperty("druid.host"),
|
||||
Emitters.create(props, emitterHttpClient, jsonMapper, lifecycle)
|
||||
);
|
||||
|
||||
final QueryToolChestWarehouse warehouse = new ReflectionQueryToolChestWarehouse();
|
||||
final ClientConfig clientConfig = configFactory.build(ClientConfig.class);
|
||||
final ClientSideServerView view = new ClientSideServerView(warehouse, smileMapper, httpClient);
|
||||
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
|
||||
clientConfig.getClientInventoryManagerConfig(),
|
||||
phoneBook,
|
||||
view
|
||||
);
|
||||
lifecycle.addManagedInstance(clientInventoryManager);
|
||||
|
||||
final CacheBroker cacheBroker = MapCacheBroker.create(
|
||||
configFactory.buildWithReplacements(MapCacheBrokerConfig.class, ImmutableMap.of("prefix", "druid.bard.cache"))
|
||||
);
|
||||
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, smileMapper);
|
||||
lifecycle.addManagedInstance(baseClient);
|
||||
|
||||
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
|
||||
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
|
||||
final MonitorScheduler monitorScheduler = new MonitorScheduler(
|
||||
configFactory.build(MonitorSchedulerConfig.class),
|
||||
globalScheduledExec,
|
||||
emitter,
|
||||
ImmutableList.<Monitor>of(
|
||||
new JvmMonitor(),
|
||||
new SysMonitor(),
|
||||
new CacheMonitor(cacheBroker)
|
||||
)
|
||||
);
|
||||
lifecycle.addManagedInstance(monitorScheduler);
|
||||
|
||||
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
|
||||
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
|
||||
serviceDiscoveryConfig.getZkHosts(),
|
||||
lifecycle
|
||||
);
|
||||
|
||||
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
|
||||
curatorFramework,
|
||||
configFactory.build(ServiceDiscoveryConfig.class),
|
||||
lifecycle
|
||||
);
|
||||
|
||||
final RequestLogger requestLogger = Initialization.makeRequestLogger(
|
||||
scheduledExecutorFactory.create(
|
||||
1,
|
||||
"RequestLogger--%d"
|
||||
),
|
||||
props
|
||||
);
|
||||
lifecycle.addManagedInstance(requestLogger);
|
||||
|
||||
final ClientQuerySegmentWalker texasRanger = new ClientQuerySegmentWalker(warehouse, emitter, baseClient);
|
||||
|
||||
final Injector injector = Guice.createInjector(new ClientServletModule(texasRanger, clientInventoryManager, jsonMapper));
|
||||
final Server server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
|
||||
final Context root = new Context(server, "/druid/v2", Context.SESSIONS);
|
||||
|
||||
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
|
||||
root.addServlet(
|
||||
new ServletHolder(new QueryServlet(jsonMapper, smileMapper, texasRanger, emitter, requestLogger)),
|
||||
"/*"
|
||||
);
|
||||
|
||||
root.addEventListener(new GuiceServletConfig(injector));
|
||||
root.addFilter(GuiceFilter.class, "/heatmap/*", 0);
|
||||
root.addFilter(GuiceFilter.class, "/datasources/*", 0);
|
||||
|
||||
try {
|
||||
lifecycle.start();
|
||||
}
|
||||
catch (Throwable t) {
|
||||
log.error(t, "Error when starting up. Failing.");
|
||||
System.exit(1);
|
||||
log.info(t, "Throwable caught at startup, committing seppuku");
|
||||
System.exit(2);
|
||||
}
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(
|
||||
new Thread(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
log.info("Running shutdown hook");
|
||||
lifecycle.stop();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
server.start();
|
||||
server.join();
|
||||
lifecycle.join();
|
||||
}
|
||||
}
|
||||
|
322
client/src/main/java/com/metamx/druid/http/BrokerNode.java
Normal file
322
client/src/main/java/com/metamx/druid/http/BrokerNode.java
Normal file
@ -0,0 +1,322 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.http;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.BaseNode;
|
||||
import com.metamx.druid.client.BrokerServerView;
|
||||
import com.metamx.druid.client.CachingClusteredClient;
|
||||
import com.metamx.druid.client.ClientConfig;
|
||||
import com.metamx.druid.client.ClientInventoryManager;
|
||||
import com.metamx.druid.client.cache.CacheBroker;
|
||||
import com.metamx.druid.client.cache.CacheMonitor;
|
||||
import com.metamx.druid.client.cache.MapCacheBroker;
|
||||
import com.metamx.druid.client.cache.MapCacheBrokerConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.query.QueryToolChestWarehouse;
|
||||
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.HttpClientConfig;
|
||||
import com.metamx.http.client.HttpClientInit;
|
||||
import com.metamx.metrics.Monitor;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.x.discovery.ServiceDiscovery;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.smile.SmileFactory;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
||||
public class BrokerNode extends BaseNode<BrokerNode>
|
||||
{
|
||||
private static final Logger log = new Logger(BrokerNode.class);
|
||||
|
||||
private final List<Module> extraModules = Lists.newArrayList();
|
||||
private final List<String> pathsForGuiceFilter = Lists.newArrayList();
|
||||
|
||||
private QueryToolChestWarehouse warehouse = null;
|
||||
private HttpClient brokerHttpClient = null;
|
||||
private CacheBroker cacheBroker = null;
|
||||
|
||||
private boolean useDiscovery = true;
|
||||
|
||||
public static Builder builder()
|
||||
{
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public BrokerNode(
|
||||
Properties props,
|
||||
Lifecycle lifecycle,
|
||||
ObjectMapper jsonMapper,
|
||||
ObjectMapper smileMapper,
|
||||
ConfigurationObjectFactory configFactory
|
||||
)
|
||||
{
|
||||
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
}
|
||||
|
||||
public QueryToolChestWarehouse getWarehouse()
|
||||
{
|
||||
initializeWarehouse();
|
||||
return warehouse;
|
||||
}
|
||||
|
||||
public BrokerNode setWarehouse(QueryToolChestWarehouse warehouse)
|
||||
{
|
||||
checkFieldNotSetAndSet("warehouse", warehouse);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpClient getBrokerHttpClient()
|
||||
{
|
||||
initializeBrokerHttpClient();
|
||||
return brokerHttpClient;
|
||||
}
|
||||
|
||||
public BrokerNode setBrokerHttpClient(HttpClient brokerHttpClient)
|
||||
{
|
||||
checkFieldNotSetAndSet("brokerHttpClient", brokerHttpClient);
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheBroker getCacheBroker()
|
||||
{
|
||||
initializeCacheBroker();
|
||||
return cacheBroker;
|
||||
}
|
||||
|
||||
public BrokerNode setCacheBroker(CacheBroker cacheBroker)
|
||||
{
|
||||
checkFieldNotSetAndSet("cacheBroker", cacheBroker);
|
||||
return this;
|
||||
}
|
||||
|
||||
public BrokerNode useDiscovery(boolean useDiscovery)
|
||||
{
|
||||
this.useDiscovery = useDiscovery;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method allows you to specify more Guice modules to use primarily for injected extra Jersey resources.
|
||||
* I'd like to remove the Guice dependency for this, but I don't know how to set up Jersey without Guice...
|
||||
*
|
||||
* This is deprecated because at some point in the future, we will eliminate the Guice dependency and anything
|
||||
* that uses this will break. Use at your own risk.
|
||||
*
|
||||
* @param module the module to register with Guice
|
||||
*
|
||||
* @return this
|
||||
*/
|
||||
@Deprecated
|
||||
public BrokerNode addModule(Module module)
|
||||
{
|
||||
extraModules.add(module);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used to specify extra paths that the GuiceFilter should pay attention to.
|
||||
*
|
||||
* This is deprecated for the same reason that addModule is deprecated.
|
||||
*
|
||||
* @param path the path that the GuiceFilter should pay attention to.
|
||||
*
|
||||
* @return this
|
||||
*/
|
||||
@Deprecated
|
||||
public BrokerNode addPathForGuiceFilter(String path)
|
||||
{
|
||||
pathsForGuiceFilter.add(path);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInit() throws Exception
|
||||
{
|
||||
initializeWarehouse();
|
||||
initializeBrokerHttpClient();
|
||||
initializeCacheBroker();
|
||||
initializeDiscovery();
|
||||
|
||||
final Lifecycle lifecycle = getLifecycle();
|
||||
|
||||
final List<Monitor> monitors = getMonitors();
|
||||
monitors.add(new CacheMonitor(cacheBroker));
|
||||
startMonitoring(monitors);
|
||||
|
||||
final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient);
|
||||
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
|
||||
getConfigFactory().build(ClientConfig.class), getPhoneBook(), view
|
||||
);
|
||||
lifecycle.addManagedInstance(clientInventoryManager);
|
||||
|
||||
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, getSmileMapper());
|
||||
lifecycle.addManagedInstance(baseClient);
|
||||
|
||||
|
||||
final ClientQuerySegmentWalker texasRanger = new ClientQuerySegmentWalker(warehouse, getEmitter(), baseClient);
|
||||
|
||||
List<Module> theModules = Lists.newArrayList();
|
||||
theModules.add(new ClientServletModule(texasRanger, clientInventoryManager, getJsonMapper()));
|
||||
theModules.addAll(extraModules);
|
||||
|
||||
final Injector injector = Guice.createInjector(theModules);
|
||||
final Context root = new Context(getServer(), "/druid/v2", Context.SESSIONS);
|
||||
|
||||
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
|
||||
root.addServlet(
|
||||
new ServletHolder(new QueryServlet(getJsonMapper(), getSmileMapper(), texasRanger, getEmitter(), getRequestLogger())),
|
||||
"/*"
|
||||
);
|
||||
|
||||
root.addEventListener(new GuiceServletConfig(injector));
|
||||
root.addFilter(GuiceFilter.class, "/datasources/*", 0);
|
||||
|
||||
for (String path : pathsForGuiceFilter) {
|
||||
root.addFilter(GuiceFilter.class, path, 0);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeDiscovery() throws Exception
|
||||
{
|
||||
if (useDiscovery) {
|
||||
final Lifecycle lifecycle = getLifecycle();
|
||||
|
||||
final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
|
||||
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
|
||||
serviceDiscoveryConfig.getZkHosts(), lifecycle
|
||||
);
|
||||
|
||||
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
|
||||
curatorFramework, serviceDiscoveryConfig, lifecycle
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeCacheBroker()
|
||||
{
|
||||
if (cacheBroker == null) {
|
||||
setCacheBroker(
|
||||
MapCacheBroker.create(
|
||||
getConfigFactory().buildWithReplacements(
|
||||
MapCacheBrokerConfig.class,
|
||||
ImmutableMap.of("prefix", "druid.bard.cache")
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeBrokerHttpClient()
|
||||
{
|
||||
if (brokerHttpClient == null) {
|
||||
setBrokerHttpClient(
|
||||
HttpClientInit.createClient(
|
||||
HttpClientConfig
|
||||
.builder()
|
||||
.withNumConnections(PropUtils.getPropertyAsInt(getProps(), "druid.client.http.connections"))
|
||||
.build(),
|
||||
getLifecycle()
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeWarehouse()
|
||||
{
|
||||
if (warehouse == null) {
|
||||
setWarehouse(new ReflectionQueryToolChestWarehouse());
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
{
|
||||
private ObjectMapper jsonMapper = null;
|
||||
private ObjectMapper smileMapper = null;
|
||||
private Lifecycle lifecycle = null;
|
||||
private Properties props = null;
|
||||
private ConfigurationObjectFactory configFactory = null;
|
||||
|
||||
public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.smileMapper = smileMapper;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withProps(Properties props)
|
||||
{
|
||||
this.props = props;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
|
||||
{
|
||||
this.configFactory = configFactory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BrokerNode build()
|
||||
{
|
||||
if (jsonMapper == null && smileMapper == null) {
|
||||
jsonMapper = new DefaultObjectMapper();
|
||||
smileMapper = new DefaultObjectMapper(new SmileFactory());
|
||||
smileMapper.getJsonFactory().setCodec(smileMapper);
|
||||
}
|
||||
else if (jsonMapper == null || smileMapper == null) {
|
||||
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
|
||||
}
|
||||
|
||||
if (lifecycle == null) {
|
||||
lifecycle = new Lifecycle();
|
||||
}
|
||||
|
||||
if (props == null) {
|
||||
props = Initialization.loadProperties();
|
||||
}
|
||||
|
||||
if (configFactory == null) {
|
||||
configFactory = Config.createFactory(props);
|
||||
}
|
||||
|
||||
return new BrokerNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
}
|
||||
}
|
||||
}
|
@ -38,11 +38,13 @@ import com.google.common.base.Throwables;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.ZKPhoneBook;
|
||||
import com.metamx.druid.http.FileRequestLogger;
|
||||
import com.metamx.druid.http.RequestLogger;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.druid.zk.StringZkSerializer;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.framework.CuratorFrameworkFactory;
|
||||
@ -88,7 +90,7 @@ public class Initialization
|
||||
return retVal;
|
||||
}
|
||||
|
||||
public static ZKPhoneBook createYellowPages(
|
||||
public static ZKPhoneBook createPhoneBook(
|
||||
ObjectMapper jsonMapper, ZkClient zkClient, String threadNameFormat, Lifecycle lifecycle
|
||||
)
|
||||
{
|
||||
@ -271,15 +273,11 @@ public class Initialization
|
||||
return serviceProvider;
|
||||
}
|
||||
|
||||
public static RequestLogger makeRequestLogger(ScheduledExecutorService exec, Properties props) throws IOException
|
||||
public static RequestLogger makeRequestLogger(ScheduledExecutorFactory factory, Properties props) throws IOException
|
||||
{
|
||||
final String property = "druid.request.logging.dir";
|
||||
final String loggingDir = props.getProperty(property);
|
||||
|
||||
if (loggingDir == null) {
|
||||
throw new ISE("property[%s] not set.", property);
|
||||
}
|
||||
|
||||
return new FileRequestLogger(exec, new File(loggingDir));
|
||||
return new FileRequestLogger(
|
||||
factory.create(1, "RequestLogger-%s"),
|
||||
new File(PropUtils.getProperty(props, "druid.request.logging.dir"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ public class IndexIO
|
||||
|
||||
public static void registerHandler(IndexIOHandler handler)
|
||||
{
|
||||
if (handler == null) {
|
||||
if (IndexIO.handler == null) {
|
||||
IndexIO.handler = handler;
|
||||
}
|
||||
else {
|
||||
|
@ -17,14 +17,17 @@
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination.legacy;
|
||||
|
||||
import org.skife.config.Config;
|
||||
package com.metamx.druid.index.v1.serde;
|
||||
|
||||
/**
|
||||
* This is a "factory" interface for registering complex metrics in the system. It exists because I'm unaware of
|
||||
* another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface
|
||||
* must be instantiatable via a no argument default constructor (the MR jobs on Hadoop use reflection to instantiate
|
||||
* instances).
|
||||
*
|
||||
* The name is not a typo, I felt that it needed an extra "er" to make the pronunciation that much more difficult.
|
||||
*/
|
||||
public abstract class TheSizeAdjusterConfig
|
||||
public interface ComplexMetricRegistererer
|
||||
{
|
||||
@Config("druid.zk.paths.indexesPath")
|
||||
public abstract String getSegmentBasePath();
|
||||
public void register();
|
||||
}
|
@ -380,8 +380,8 @@ public class IndexerCoordinatorNode
|
||||
if (taskToolbox == null) {
|
||||
final RestS3Service s3Client = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
props.getProperty("com.metamx.aws.accessKey"),
|
||||
props.getProperty("com.metamx.aws.secretKey")
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
final SegmentPusher segmentPusher = new S3SegmentPusher(
|
||||
@ -435,7 +435,7 @@ public class IndexerCoordinatorNode
|
||||
{
|
||||
if (taskInventoryManager == null) {
|
||||
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
|
||||
final PhoneBook masterYp = Initialization.createYellowPages(
|
||||
final PhoneBook masterYp = Initialization.createPhoneBook(
|
||||
jsonMapper,
|
||||
zkClient,
|
||||
"Master-ZKYP--%s",
|
||||
|
@ -280,8 +280,8 @@ public class WorkerNode
|
||||
if (taskToolbox == null) {
|
||||
final RestS3Service s3Client = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
props.getProperty("com.metamx.aws.accessKey"),
|
||||
props.getProperty("com.metamx.aws.secretKey")
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
final SegmentPusher segmentPusher = new S3SegmentPusher(
|
||||
@ -296,7 +296,7 @@ public class WorkerNode
|
||||
public void initializeCuratorFramework() throws IOException
|
||||
{
|
||||
curatorFramework = Initialization.makeCuratorFrameworkClient(
|
||||
props.getProperty("druid.zk.service.host"),
|
||||
PropUtils.getProperty(props, "druid.zk.service.host"),
|
||||
lifecycle
|
||||
);
|
||||
}
|
||||
|
@ -19,75 +19,53 @@
|
||||
|
||||
package com.metamx.druid.realtime;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.codehaus.jackson.map.BeanProperty;
|
||||
import org.codehaus.jackson.map.DeserializationContext;
|
||||
import org.codehaus.jackson.map.InjectableValues;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.jsontype.NamedType;
|
||||
import org.codehaus.jackson.smile.SmileFactory;
|
||||
import org.codehaus.jackson.type.TypeReference;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.security.AWSCredentials;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.BaseServerNode;
|
||||
import com.metamx.druid.client.ClientConfig;
|
||||
import com.metamx.druid.client.ClientInventoryManager;
|
||||
import com.metamx.druid.client.MutableServerView;
|
||||
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
|
||||
import com.metamx.druid.client.ServerView;
|
||||
import com.metamx.druid.collect.StupidPool;
|
||||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.db.DbConnectorConfig;
|
||||
import com.metamx.druid.http.QueryServlet;
|
||||
import com.metamx.druid.http.RequestLogger;
|
||||
import com.metamx.druid.http.StatusServlet;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ServerInit;
|
||||
import com.metamx.druid.initialization.ZkClientConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
|
||||
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.Emitters;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.HttpClientConfig;
|
||||
import com.metamx.http.client.HttpClientInit;
|
||||
import com.metamx.metrics.JvmMonitor;
|
||||
import com.metamx.metrics.Monitor;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import com.metamx.metrics.MonitorSchedulerConfig;
|
||||
import com.metamx.metrics.SysMonitor;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import org.codehaus.jackson.map.BeanProperty;
|
||||
import org.codehaus.jackson.map.DeserializationContext;
|
||||
import org.codehaus.jackson.map.InjectableValues;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.smile.SmileFactory;
|
||||
import org.codehaus.jackson.type.TypeReference;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.security.AWSCredentials;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class RealtimeNode
|
||||
public class RealtimeNode extends BaseServerNode<RealtimeNode>
|
||||
{
|
||||
private static final Logger log = new Logger(RealtimeNode.class);
|
||||
|
||||
@ -96,155 +74,112 @@ public class RealtimeNode
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
private final Lifecycle lifecycle;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ObjectMapper smileMapper;
|
||||
private final Properties props;
|
||||
private final ConfigurationObjectFactory configFactory;
|
||||
|
||||
private final Map<String, Object> injectablesMap = Maps.newLinkedHashMap();
|
||||
|
||||
private PhoneBook phoneBook = null;
|
||||
private ServiceEmitter emitter = null;
|
||||
private ServerView view = null;
|
||||
private MetadataUpdater metadataUpdater = null;
|
||||
private QueryRunnerFactoryConglomerate conglomerate = null;
|
||||
private SegmentPusher segmentPusher = null;
|
||||
private List<FireDepartment> fireDepartments = null;
|
||||
private List<Monitor> monitors = null;
|
||||
private Server server = null;
|
||||
private ServerView view = null;
|
||||
|
||||
private boolean initialized = false;
|
||||
|
||||
public RealtimeNode(
|
||||
ObjectMapper jsonMapper,
|
||||
ObjectMapper smileMapper,
|
||||
Lifecycle lifecycle,
|
||||
Properties props,
|
||||
ConfigurationObjectFactory configFactory
|
||||
Properties props,
|
||||
Lifecycle lifecycle,
|
||||
ObjectMapper jsonMapper,
|
||||
ObjectMapper smileMapper,
|
||||
ConfigurationObjectFactory configFactory
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.smileMapper = smileMapper;
|
||||
this.lifecycle = lifecycle;
|
||||
this.props = props;
|
||||
this.configFactory = configFactory;
|
||||
}
|
||||
|
||||
public RealtimeNode setPhoneBook(PhoneBook phoneBook)
|
||||
{
|
||||
this.phoneBook = phoneBook;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RealtimeNode setEmitter(ServiceEmitter emitter)
|
||||
{
|
||||
this.emitter = emitter;
|
||||
return this;
|
||||
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
}
|
||||
|
||||
public RealtimeNode setView(ServerView view)
|
||||
{
|
||||
Preconditions.checkState(this.view == null, "Cannot set view once it has already been set.");
|
||||
this.view = view;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RealtimeNode setMetadataUpdater(MetadataUpdater metadataUpdater)
|
||||
{
|
||||
Preconditions.checkState(this.metadataUpdater == null, "Cannot set metadataUpdater once it has already been set.");
|
||||
this.metadataUpdater = metadataUpdater;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RealtimeNode setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
|
||||
{
|
||||
this.conglomerate = conglomerate;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RealtimeNode setSegmentPusher(SegmentPusher segmentPusher)
|
||||
{
|
||||
Preconditions.checkState(this.segmentPusher == null, "Cannot set segmentPusher once it has already been set.");
|
||||
this.segmentPusher = segmentPusher;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RealtimeNode setFireDepartments(List<FireDepartment> fireDepartments)
|
||||
{
|
||||
Preconditions.checkState(this.fireDepartments == null, "Cannot set fireDepartments once it has already been set.");
|
||||
this.fireDepartments = fireDepartments;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RealtimeNode setMonitors(List<Monitor> monitors)
|
||||
{
|
||||
this.monitors = Lists.newArrayList(monitors);
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setServer(Server server)
|
||||
{
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
public RealtimeNode registerJacksonInjectable(String name, Object object)
|
||||
{
|
||||
Preconditions.checkState(injectablesMap.containsKey(name), "Already registered jackson object[%s]", name);
|
||||
injectablesMap.put(name, object);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RealtimeNode registerJacksonSubtype(Class<?>... clazzes)
|
||||
public MetadataUpdater getMetadataUpdater()
|
||||
{
|
||||
jsonMapper.registerSubtypes(clazzes);
|
||||
return this;
|
||||
initializeMetadataUpdater();
|
||||
return metadataUpdater;
|
||||
}
|
||||
|
||||
public RealtimeNode registerJacksonSubtype(NamedType... namedTypes)
|
||||
public SegmentPusher getSegmentPusher()
|
||||
{
|
||||
jsonMapper.registerSubtypes(namedTypes);
|
||||
return this;
|
||||
initializeSegmentPusher();
|
||||
return segmentPusher;
|
||||
}
|
||||
|
||||
private void init() throws Exception
|
||||
public List<FireDepartment> getFireDepartments()
|
||||
{
|
||||
if (phoneBook == null) {
|
||||
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
|
||||
phoneBook = Initialization.createYellowPages(
|
||||
jsonMapper,
|
||||
zkClient,
|
||||
"Realtime-ZKYP--%s",
|
||||
lifecycle
|
||||
);
|
||||
}
|
||||
initializeFireDepartments();
|
||||
return fireDepartments;
|
||||
}
|
||||
|
||||
initializeEmitter();
|
||||
public ServerView getView()
|
||||
{
|
||||
initializeView();
|
||||
return view;
|
||||
}
|
||||
|
||||
protected void doInit() throws Exception
|
||||
{
|
||||
initializeView();
|
||||
initializeMetadataUpdater();
|
||||
initializeQueryRunnerFactoryConglomerate();
|
||||
initializeSegmentPusher();
|
||||
initializeMonitors();
|
||||
initializeServer();
|
||||
initializeJacksonInjectables();
|
||||
|
||||
initializeFireDepartments();
|
||||
|
||||
final Lifecycle lifecycle = getLifecycle();
|
||||
final ServiceEmitter emitter = getEmitter();
|
||||
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
|
||||
final List<Monitor> monitors = getMonitors();
|
||||
|
||||
monitors.add(new RealtimeMetricsMonitor(fireDepartments));
|
||||
|
||||
final RealtimeManager realtimeManager = new RealtimeManager(fireDepartments, conglomerate);
|
||||
lifecycle.addManagedInstance(realtimeManager);
|
||||
|
||||
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
|
||||
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
|
||||
final MonitorScheduler monitorScheduler = new MonitorScheduler(
|
||||
configFactory.build(MonitorSchedulerConfig.class),
|
||||
globalScheduledExec,
|
||||
emitter,
|
||||
monitors
|
||||
);
|
||||
lifecycle.addManagedInstance(monitorScheduler);
|
||||
startMonitoring(monitors);
|
||||
|
||||
final RequestLogger requestLogger = Initialization.makeRequestLogger(globalScheduledExec, props);
|
||||
lifecycle.addManagedInstance(requestLogger);
|
||||
|
||||
final Context v2Druid = new Context(server, "/druid/v2", Context.SESSIONS);
|
||||
final Context v2Druid = new Context(getServer(), "/druid/v2", Context.SESSIONS);
|
||||
v2Druid.addServlet(new ServletHolder(new StatusServlet()), "/status");
|
||||
v2Druid.addServlet(
|
||||
new ServletHolder(new QueryServlet(jsonMapper, smileMapper, realtimeManager, emitter, requestLogger)),
|
||||
new ServletHolder(
|
||||
new QueryServlet(getJsonMapper(), getSmileMapper(), realtimeManager, emitter, getRequestLogger())
|
||||
),
|
||||
"/*"
|
||||
);
|
||||
|
||||
@ -258,47 +193,16 @@ public class RealtimeNode
|
||||
init();
|
||||
}
|
||||
|
||||
lifecycle.start();
|
||||
getLifecycle().start();
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public synchronized void stop()
|
||||
{
|
||||
lifecycle.stop();
|
||||
getLifecycle().stop();
|
||||
}
|
||||
|
||||
private void initializeServer()
|
||||
{
|
||||
if (server == null) {
|
||||
server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
|
||||
|
||||
lifecycle.addHandler(
|
||||
new Lifecycle.Handler()
|
||||
{
|
||||
@Override
|
||||
public void start() throws Exception
|
||||
{
|
||||
log.info("Starting Jetty");
|
||||
server.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
log.info("Stopping Jetty");
|
||||
try {
|
||||
server.stop();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception thrown while stopping Jetty");
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeJacksonInjectables()
|
||||
protected void initializeJacksonInjectables()
|
||||
{
|
||||
final Map<String, Object> injectables = Maps.newHashMap();
|
||||
|
||||
@ -306,13 +210,13 @@ public class RealtimeNode
|
||||
injectables.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
injectables.put("queryRunnerFactoryConglomerate", conglomerate);
|
||||
injectables.put("queryRunnerFactoryConglomerate", getConglomerate());
|
||||
injectables.put("segmentPusher", segmentPusher);
|
||||
injectables.put("metadataUpdater", metadataUpdater);
|
||||
injectables.put("serverView", view);
|
||||
injectables.put("serviceEmitter", emitter);
|
||||
injectables.put("serviceEmitter", getEmitter());
|
||||
|
||||
jsonMapper.setInjectableValues(
|
||||
getJsonMapper().setInjectableValues(
|
||||
new InjectableValues()
|
||||
{
|
||||
@Override
|
||||
@ -326,96 +230,70 @@ public class RealtimeNode
|
||||
);
|
||||
}
|
||||
|
||||
private void initializeMonitors()
|
||||
{
|
||||
if (monitors == null) {
|
||||
monitors = Lists.newArrayList();
|
||||
monitors.add(new JvmMonitor());
|
||||
monitors.add(new SysMonitor());
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeFireDepartments() throws IOException
|
||||
private void initializeFireDepartments()
|
||||
{
|
||||
if (fireDepartments == null) {
|
||||
fireDepartments = jsonMapper.readValue(
|
||||
new File(PropUtils.getProperty(props, "druid.realtime.specFile")),
|
||||
new TypeReference<List<FireDepartment>>(){}
|
||||
);
|
||||
try {
|
||||
fireDepartments = getJsonMapper().readValue(
|
||||
new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")),
|
||||
new TypeReference<List<FireDepartment>>(){}
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeSegmentPusher() throws S3ServiceException
|
||||
private void initializeSegmentPusher()
|
||||
{
|
||||
if (segmentPusher == null) {
|
||||
final RestS3Service s3Client = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
final Properties props = getProps();
|
||||
final RestS3Service s3Client;
|
||||
try {
|
||||
s3Client = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (S3ServiceException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
segmentPusher = new S3SegmentPusher(s3Client, configFactory.build(S3SegmentPusherConfig.class), jsonMapper);
|
||||
segmentPusher = new S3SegmentPusher(s3Client, getConfigFactory().build(S3SegmentPusherConfig.class), getJsonMapper());
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeQueryRunnerFactoryConglomerate()
|
||||
{
|
||||
if (conglomerate == null) {
|
||||
StupidPool<ByteBuffer> computationBufferPool = ServerInit.makeComputeScratchPool(
|
||||
PropUtils.getPropertyAsInt(props, "druid.computation.buffer.size", 1024 * 1024 * 1024)
|
||||
);
|
||||
conglomerate = new DefaultQueryRunnerFactoryConglomerate(
|
||||
ServerInit.initDefaultQueryTypes(configFactory, computationBufferPool)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeMetadataUpdater()
|
||||
protected void initializeMetadataUpdater()
|
||||
{
|
||||
if (metadataUpdater == null) {
|
||||
metadataUpdater = new MetadataUpdater(
|
||||
jsonMapper,
|
||||
configFactory.build(MetadataUpdaterConfig.class),
|
||||
phoneBook,
|
||||
new DbConnector(configFactory.build(DbConnectorConfig.class)).getDBI()
|
||||
getJsonMapper(),
|
||||
getConfigFactory().build(MetadataUpdaterConfig.class),
|
||||
getPhoneBook(),
|
||||
new DbConnector(getConfigFactory().build(DbConnectorConfig.class)).getDBI()
|
||||
);
|
||||
lifecycle.addManagedInstance(metadataUpdater);
|
||||
getLifecycle().addManagedInstance(metadataUpdater);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeView()
|
||||
{
|
||||
if (view == null) {
|
||||
final ClientConfig clientConfig = configFactory.build(ClientConfig.class);
|
||||
final MutableServerView view = new OnlyNewSegmentWatcherServerView();
|
||||
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
|
||||
clientConfig.getClientInventoryManagerConfig(),
|
||||
phoneBook,
|
||||
getConfigFactory().build(ClientConfig.class),
|
||||
getPhoneBook(),
|
||||
view
|
||||
);
|
||||
lifecycle.addManagedInstance(clientInventoryManager);
|
||||
getLifecycle().addManagedInstance(clientInventoryManager);
|
||||
|
||||
this.view = view;
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeEmitter()
|
||||
{
|
||||
if (emitter == null) {
|
||||
final HttpClient httpClient = HttpClientInit.createClient(
|
||||
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
|
||||
);
|
||||
|
||||
emitter = new ServiceEmitter(
|
||||
PropUtils.getProperty(props, "druid.service"),
|
||||
PropUtils.getProperty(props, "druid.host"),
|
||||
Emitters.create(props, httpClient, jsonMapper, lifecycle)
|
||||
);
|
||||
}
|
||||
EmittingLogger.registerEmitter(emitter);
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
{
|
||||
private ObjectMapper jsonMapper = null;
|
||||
@ -466,7 +344,7 @@ public class RealtimeNode
|
||||
configFactory = Config.createFactory(props);
|
||||
}
|
||||
|
||||
return new RealtimeNode(jsonMapper, smileMapper, lifecycle, props, configFactory);
|
||||
return new RealtimeNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
}
|
||||
}
|
||||
}
|
124
server/src/main/java/com/metamx/druid/BaseServerNode.java
Normal file
124
server/src/main/java/com/metamx/druid/BaseServerNode.java
Normal file
@ -0,0 +1,124 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.collect.StupidPool;
|
||||
import com.metamx.druid.initialization.ServerInit;
|
||||
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
|
||||
import com.metamx.druid.query.QueryRunnerFactory;
|
||||
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class BaseServerNode<T extends BaseNode> extends BaseNode<T>
|
||||
{
|
||||
private final Map<Class<? extends Query>, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap();
|
||||
private QueryRunnerFactoryConglomerate conglomerate = null;
|
||||
private StupidPool<ByteBuffer> computeScratchPool = null;
|
||||
|
||||
public BaseServerNode(
|
||||
Logger log,
|
||||
Properties props,
|
||||
Lifecycle lifecycle,
|
||||
ObjectMapper jsonMapper,
|
||||
ObjectMapper smileMapper,
|
||||
ConfigurationObjectFactory configFactory
|
||||
)
|
||||
{
|
||||
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
}
|
||||
|
||||
public QueryRunnerFactoryConglomerate getConglomerate()
|
||||
{
|
||||
initializeQueryRunnerFactoryConglomerate();
|
||||
return conglomerate;
|
||||
}
|
||||
|
||||
public StupidPool<ByteBuffer> getComputeScratchPool()
|
||||
{
|
||||
initializeComputeScratchPool();
|
||||
return computeScratchPool;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
|
||||
{
|
||||
checkFieldNotSetAndSet("conglomerate", conglomerate);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T setComputeScratchPool(StupidPool<ByteBuffer> computeScratchPool)
|
||||
{
|
||||
checkFieldNotSetAndSet("computeScratchPool", computeScratchPool);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T registerQueryRunnerFactory(Class<? extends Query> queryClazz, QueryRunnerFactory factory)
|
||||
{
|
||||
Preconditions.checkState(
|
||||
conglomerate == null,
|
||||
"Registering a QueryRunnerFactory only works when a separate conglomerate is not specified."
|
||||
);
|
||||
Preconditions.checkState(
|
||||
!additionalFactories.containsKey(queryClazz), "Registered factory for class[%s] multiple times", queryClazz
|
||||
);
|
||||
additionalFactories.put(queryClazz, factory);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
private void initializeComputeScratchPool()
|
||||
{
|
||||
if (computeScratchPool == null) {
|
||||
setComputeScratchPool(
|
||||
ServerInit.makeComputeScratchPool(
|
||||
PropUtils.getPropertyAsInt(getProps(), "druid.computation.buffer.size", 1024 * 1024 * 1024)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeQueryRunnerFactoryConglomerate()
|
||||
{
|
||||
if (conglomerate == null) {
|
||||
final Map<Class<? extends Query>, QueryRunnerFactory> factories = ServerInit.initDefaultQueryTypes(
|
||||
getConfigFactory(), getComputeScratchPool()
|
||||
);
|
||||
|
||||
for (Map.Entry<Class<? extends Query>, QueryRunnerFactory> entry : additionalFactories.entrySet()) {
|
||||
factories.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
setConglomerate(new DefaultQueryRunnerFactoryConglomerate(factories));
|
||||
}
|
||||
}
|
||||
}
|
@ -30,6 +30,7 @@ import org.joda.time.Interval;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.StorageAdapter;
|
||||
@ -94,12 +95,16 @@ public class ServerManager implements QuerySegmentWalker
|
||||
|
||||
public Map<String, Long> getDataSourceSizes()
|
||||
{
|
||||
return dataSourceSizes.snapshot();
|
||||
synchronized (dataSourceSizes) {
|
||||
return dataSourceSizes.snapshot();
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, Long> getDataSourceCounts()
|
||||
{
|
||||
return dataSourceCounts.snapshot();
|
||||
synchronized (dataSourceCounts) {
|
||||
return dataSourceCounts.snapshot();
|
||||
}
|
||||
}
|
||||
|
||||
public void loadSegment(final DataSegment segment) throws StorageAdapterLoadingException
|
||||
@ -109,7 +114,12 @@ public class ServerManager implements QuerySegmentWalker
|
||||
adapter = storageAdapterLoader.getAdapter(segment.getLoadSpec());
|
||||
}
|
||||
catch (StorageAdapterLoadingException e) {
|
||||
storageAdapterLoader.cleanupAdapter(segment.getLoadSpec());
|
||||
try {
|
||||
storageAdapterLoader.cleanupAdapter(segment.getLoadSpec());
|
||||
}
|
||||
catch (StorageAdapterLoadingException e1) {
|
||||
// ignore
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
@ -140,8 +150,12 @@ public class ServerManager implements QuerySegmentWalker
|
||||
loadedIntervals.add(
|
||||
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(adapter)
|
||||
);
|
||||
dataSourceSizes.add(dataSource, segment.getSize());
|
||||
dataSourceCounts.add(dataSource, 1L);
|
||||
synchronized (dataSourceSizes) {
|
||||
dataSourceSizes.add(dataSource, segment.getSize());
|
||||
}
|
||||
synchronized (dataSourceCounts) {
|
||||
dataSourceCounts.add(dataSource, 1L);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -162,8 +176,12 @@ public class ServerManager implements QuerySegmentWalker
|
||||
StorageAdapter oldQueryable = (removed == null) ? null : removed.getObject();
|
||||
|
||||
if (oldQueryable != null) {
|
||||
dataSourceSizes.add(dataSource, -segment.getSize());
|
||||
dataSourceCounts.add(dataSource, -1L);
|
||||
synchronized (dataSourceSizes) {
|
||||
dataSourceSizes.add(dataSource, -segment.getSize());
|
||||
}
|
||||
synchronized (dataSourceCounts) {
|
||||
dataSourceCounts.add(dataSource, -1L);
|
||||
}
|
||||
} else {
|
||||
log.info(
|
||||
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
|
||||
@ -181,10 +199,7 @@ public class ServerManager implements QuerySegmentWalker
|
||||
{
|
||||
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
|
||||
if (factory == null) {
|
||||
log.makeAlert("Unknown query type, [%s]", query.getClass())
|
||||
.addData("dataSource", query.getDataSource())
|
||||
.emit();
|
||||
return new NoopQueryRunner<T>();
|
||||
throw new ISE("Unknown query type[%s].", query.getClass());
|
||||
}
|
||||
|
||||
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
|
||||
|
@ -1,69 +0,0 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination.legacy;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.model.S3Bucket;
|
||||
import org.jets3t.service.model.S3Object;
|
||||
|
||||
import com.metamx.common.MapUtils;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class S3SizeLookup implements SizeLookup
|
||||
{
|
||||
private static final Logger log = new Logger(S3SizeLookup.class);
|
||||
|
||||
private final RestS3Service s3Client;
|
||||
|
||||
public S3SizeLookup(
|
||||
RestS3Service s3Client
|
||||
)
|
||||
{
|
||||
this.s3Client = s3Client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long lookupSize(Map<String, Object> loadSpec)
|
||||
{
|
||||
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
|
||||
String s3Path = MapUtils.getString(loadSpec, "key");
|
||||
|
||||
S3Object s3Obj = null;
|
||||
try {
|
||||
s3Obj = s3Client.getObjectDetails(new S3Bucket(s3Bucket), s3Path);
|
||||
}
|
||||
catch (S3ServiceException e) {
|
||||
log.warn(e, "Exception when trying to lookup size for s3://%s/%s", s3Bucket, s3Path);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (s3Obj == null) {
|
||||
log.warn("s3Object for s3://%s/%s was null.", s3Bucket, s3Path);
|
||||
return null;
|
||||
}
|
||||
|
||||
return s3Obj.getContentLength();
|
||||
}
|
||||
}
|
@ -1,107 +0,0 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination.legacy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.metamx.common.MapUtils;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TheSizeAdjuster
|
||||
{
|
||||
private static final Logger log = new Logger(TheSizeAdjuster.class);
|
||||
private static final Joiner JOINER = Joiner.on("/");
|
||||
|
||||
private final TheSizeAdjusterConfig config;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final Map<String, SizeLookup> lookups;
|
||||
private final ZkClient zkClient;
|
||||
|
||||
public TheSizeAdjuster(
|
||||
TheSizeAdjusterConfig config,
|
||||
ObjectMapper jsonMapper,
|
||||
Map<String, SizeLookup> lookups,
|
||||
ZkClient zkClient
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.lookups = lookups;
|
||||
this.zkClient = zkClient;
|
||||
}
|
||||
|
||||
public Long lookupSize(Map<String, Object> descriptor)
|
||||
{
|
||||
String type = MapUtils.getString(descriptor, "type");
|
||||
SizeLookup adjuster = lookups.get(type);
|
||||
|
||||
if (adjuster == null) {
|
||||
log.warn("Unknown type[%s] for SizeAdjuster, known types are %s", type, lookups.keySet());
|
||||
return null;
|
||||
}
|
||||
|
||||
return adjuster.lookupSize(descriptor);
|
||||
}
|
||||
|
||||
public DataSegment updateDescriptor(DataSegment dataSegment)
|
||||
{
|
||||
Long size = lookupSize(dataSegment.getLoadSpec());
|
||||
|
||||
if (size == null || size < 0) {
|
||||
log.warn("Unable to determine size[%s] of segment[%s], ignoring.", size, dataSegment);
|
||||
return null;
|
||||
}
|
||||
|
||||
final DataSegment segment = new DataSegment(
|
||||
dataSegment.getDataSource(),
|
||||
dataSegment.getInterval(),
|
||||
dataSegment.getVersion() + "_w_size",
|
||||
dataSegment.getLoadSpec(),
|
||||
dataSegment.getDimensions(),
|
||||
dataSegment.getMetrics(),
|
||||
dataSegment.getShardSpec(),
|
||||
size
|
||||
);
|
||||
|
||||
String oldSegmentPath = JOINER.join(config.getSegmentBasePath(), dataSegment.getDataSource(), dataSegment.getIdentifier());
|
||||
String newSegmentPath = JOINER.join(config.getSegmentBasePath(), segment.getDataSource(), segment.getIdentifier());
|
||||
try {
|
||||
String data = jsonMapper.writeValueAsString(segment);
|
||||
zkClient.createPersistent(newSegmentPath, data);
|
||||
log.info("Created new segment node[%s] with content[%s]", newSegmentPath, data);
|
||||
zkClient.delete(oldSegmentPath);
|
||||
log.info("Deleted old segment node[%s]", oldSegmentPath);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Exception thrown on segment[%s]", segment);
|
||||
return null;
|
||||
}
|
||||
|
||||
return segment;
|
||||
}
|
||||
}
|
@ -17,13 +17,36 @@
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination.legacy;
|
||||
package com.metamx.druid.http;
|
||||
|
||||
import java.util.Map;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.log.LogLevelAdjuster;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface SizeLookup
|
||||
public class ComputeMain
|
||||
{
|
||||
public Long lookupSize(Map<String, Object> descriptor);
|
||||
private static final Logger log = new Logger(ComputeMain.class);
|
||||
|
||||
public static void main(String[] args) throws Exception
|
||||
{
|
||||
LogLevelAdjuster.register();
|
||||
|
||||
Lifecycle lifecycle = new Lifecycle();
|
||||
|
||||
lifecycle.addManagedInstance(
|
||||
ComputeNode.builder().build()
|
||||
);
|
||||
|
||||
try {
|
||||
lifecycle.start();
|
||||
}
|
||||
catch (Throwable t) {
|
||||
log.info(t, "Throwable caught at startup, committing seppuku");
|
||||
System.exit(2);
|
||||
}
|
||||
|
||||
lifecycle.join();
|
||||
}
|
||||
}
|
236
server/src/main/java/com/metamx/druid/http/ComputeNode.java
Normal file
236
server/src/main/java/com/metamx/druid/http/ComputeNode.java
Normal file
@ -0,0 +1,236 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.http;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ExecutorServiceConfig;
|
||||
import com.metamx.common.concurrent.ExecutorServices;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.BaseServerNode;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.DruidServerConfig;
|
||||
import com.metamx.druid.coordination.ServerManager;
|
||||
import com.metamx.druid.coordination.ZkCoordinator;
|
||||
import com.metamx.druid.coordination.ZkCoordinatorConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerInit;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.QueryableLoaderConfig;
|
||||
import com.metamx.druid.loading.StorageAdapterLoader;
|
||||
import com.metamx.druid.metrics.ServerMonitor;
|
||||
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.metrics.Monitor;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.smile.SmileFactory;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.security.AWSCredentials;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ComputeNode extends BaseServerNode<ComputeNode>
|
||||
{
|
||||
private static final Logger log = new Logger(ComputeNode.class);
|
||||
|
||||
public static Builder builder()
|
||||
{
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
private DruidServer druidServer;
|
||||
private StorageAdapterLoader adapterLoader;
|
||||
|
||||
public ComputeNode(
|
||||
Properties props,
|
||||
Lifecycle lifecycle,
|
||||
ObjectMapper jsonMapper,
|
||||
ObjectMapper smileMapper,
|
||||
ConfigurationObjectFactory configFactory
|
||||
)
|
||||
{
|
||||
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
}
|
||||
|
||||
public ComputeNode setAdapterLoader(StorageAdapterLoader storageAdapterLoader)
|
||||
{
|
||||
Preconditions.checkState(this.adapterLoader == null, "Cannot set adapterLoader once it has already been set.");
|
||||
this.adapterLoader = storageAdapterLoader;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ComputeNode setDruidServer(DruidServer druidServer)
|
||||
{
|
||||
Preconditions.checkState(this.druidServer == null, "Cannot set druidServer once it has already been set.");
|
||||
this.druidServer = druidServer;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DruidServer getDruidServer()
|
||||
{
|
||||
initializeDruidServer();
|
||||
return druidServer;
|
||||
}
|
||||
|
||||
public StorageAdapterLoader getAdapterLoader()
|
||||
{
|
||||
initializeAdapterLoader();
|
||||
return adapterLoader;
|
||||
}
|
||||
|
||||
protected void doInit() throws Exception
|
||||
{
|
||||
initializeDruidServer();
|
||||
initializeAdapterLoader();
|
||||
|
||||
final Lifecycle lifecycle = getLifecycle();
|
||||
final ServiceEmitter emitter = getEmitter();
|
||||
final List<Monitor> monitors = getMonitors();
|
||||
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
|
||||
|
||||
final ExecutorService executorService = ExecutorServices.create(
|
||||
getLifecycle(),
|
||||
getConfigFactory().buildWithReplacements(
|
||||
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
|
||||
)
|
||||
);
|
||||
ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService);
|
||||
|
||||
final ZkCoordinator coordinator = new ZkCoordinator(
|
||||
getJsonMapper(),
|
||||
getConfigFactory().build(ZkCoordinatorConfig.class),
|
||||
druidServer,
|
||||
getPhoneBook(),
|
||||
serverManager,
|
||||
emitter
|
||||
);
|
||||
lifecycle.addManagedInstance(coordinator);
|
||||
|
||||
monitors.add(new ServerMonitor(getDruidServer(), serverManager));
|
||||
startMonitoring(monitors);
|
||||
|
||||
final Context root = new Context(getServer(), "/", Context.SESSIONS);
|
||||
|
||||
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
|
||||
root.addServlet(
|
||||
new ServletHolder(
|
||||
new QueryServlet(getJsonMapper(), getSmileMapper(), serverManager, emitter, getRequestLogger())
|
||||
),
|
||||
"/*"
|
||||
);
|
||||
}
|
||||
|
||||
private void initializeAdapterLoader()
|
||||
{
|
||||
if (adapterLoader == null) {
|
||||
final Properties props = getProps();
|
||||
try {
|
||||
final RestS3Service s3Client = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
|
||||
setAdapterLoader(
|
||||
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(QueryableLoaderConfig.class))
|
||||
);
|
||||
}
|
||||
catch (S3ServiceException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeDruidServer()
|
||||
{
|
||||
if (druidServer == null) {
|
||||
setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class)));
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
{
|
||||
private ObjectMapper jsonMapper = null;
|
||||
private ObjectMapper smileMapper = null;
|
||||
private Lifecycle lifecycle = null;
|
||||
private Properties props = null;
|
||||
private ConfigurationObjectFactory configFactory = null;
|
||||
|
||||
public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.smileMapper = smileMapper;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withProps(Properties props)
|
||||
{
|
||||
this.props = props;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
|
||||
{
|
||||
this.configFactory = configFactory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ComputeNode build()
|
||||
{
|
||||
if (jsonMapper == null && smileMapper == null) {
|
||||
jsonMapper = new DefaultObjectMapper();
|
||||
smileMapper = new DefaultObjectMapper(new SmileFactory());
|
||||
smileMapper.getJsonFactory().setCodec(smileMapper);
|
||||
}
|
||||
else if (jsonMapper == null || smileMapper == null) {
|
||||
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
|
||||
}
|
||||
|
||||
if (lifecycle == null) {
|
||||
lifecycle = new Lifecycle();
|
||||
}
|
||||
|
||||
if (props == null) {
|
||||
props = Initialization.loadProperties();
|
||||
}
|
||||
|
||||
if (configFactory == null) {
|
||||
configFactory = Config.createFactory(props);
|
||||
}
|
||||
|
||||
return new ComputeNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -39,7 +39,6 @@ import org.skife.jdbi.v2.DBI;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
@ -54,10 +53,6 @@ import com.metamx.druid.client.ServerInventoryManager;
|
||||
import com.metamx.druid.client.ServerInventoryManagerConfig;
|
||||
import com.metamx.druid.coordination.DruidClusterInfo;
|
||||
import com.metamx.druid.coordination.DruidClusterInfoConfig;
|
||||
import com.metamx.druid.coordination.legacy.S3SizeLookup;
|
||||
import com.metamx.druid.coordination.legacy.SizeLookup;
|
||||
import com.metamx.druid.coordination.legacy.TheSizeAdjuster;
|
||||
import com.metamx.druid.coordination.legacy.TheSizeAdjusterConfig;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.db.DatabaseSegmentManagerConfig;
|
||||
import com.metamx.druid.db.DbConnector;
|
||||
@ -71,6 +66,7 @@ import com.metamx.druid.log.LogLevelAdjuster;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.master.DruidMasterConfig;
|
||||
import com.metamx.druid.master.LoadQueuePeon;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.core.Emitters;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
@ -86,6 +82,20 @@ import com.metamx.phonebook.PhoneBook;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.x.discovery.ServiceDiscovery;
|
||||
import com.netflix.curator.x.discovery.ServiceProvider;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.DefaultServlet;
|
||||
import org.mortbay.jetty.servlet.FilterHolder;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ -107,21 +117,14 @@ public class MasterMain
|
||||
);
|
||||
|
||||
final ServiceEmitter emitter = new ServiceEmitter(
|
||||
props.getProperty("druid.service"),
|
||||
props.getProperty("druid.host"),
|
||||
PropUtils.getProperty(props, "druid.service"),
|
||||
PropUtils.getProperty(props, "druid.host"),
|
||||
Emitters.create(props, httpClient, jsonMapper, lifecycle)
|
||||
);
|
||||
|
||||
final RestS3Service s3Client = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
props.getProperty("com.metamx.aws.accessKey"),
|
||||
props.getProperty("com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
|
||||
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
|
||||
|
||||
final PhoneBook masterYp = Initialization.createYellowPages(jsonMapper, zkClient, "Master-ZKYP--%s", lifecycle);
|
||||
final PhoneBook masterYp = Initialization.createPhoneBook(jsonMapper, zkClient, "Master-ZKYP--%s", lifecycle);
|
||||
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
|
||||
|
||||
final SegmentInventoryManager segmentInventoryManager =
|
||||
@ -184,14 +187,6 @@ public class MasterMain
|
||||
jsonMapper,
|
||||
databaseSegmentManager,
|
||||
serverInventoryManager,
|
||||
new TheSizeAdjuster(
|
||||
configFactory.build(TheSizeAdjusterConfig.class),
|
||||
jsonMapper,
|
||||
ImmutableMap.<String, SizeLookup>of(
|
||||
"s3", new S3SizeLookup(s3Client)
|
||||
),
|
||||
zkClient
|
||||
),
|
||||
masterYp,
|
||||
emitter,
|
||||
scheduledExecutorFactory,
|
||||
|
@ -19,196 +19,26 @@
|
||||
|
||||
package com.metamx.druid.http;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.smile.SmileFactory;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.security.AWSCredentials;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.concurrent.ExecutorServiceConfig;
|
||||
import com.metamx.common.concurrent.ExecutorServices;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.DruidServerConfig;
|
||||
import com.metamx.druid.collect.StupidPool;
|
||||
import com.metamx.druid.coordination.ServerManager;
|
||||
import com.metamx.druid.coordination.ZkCoordinator;
|
||||
import com.metamx.druid.coordination.ZkCoordinatorConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ServerInit;
|
||||
import com.metamx.druid.initialization.ZkClientConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.QueryableLoaderConfig;
|
||||
import com.metamx.druid.log.LogLevelAdjuster;
|
||||
import com.metamx.druid.metrics.ServerMonitor;
|
||||
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
|
||||
import com.metamx.druid.query.QueryRunnerFactory;
|
||||
import com.metamx.emitter.core.Emitters;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.HttpClientConfig;
|
||||
import com.metamx.http.client.HttpClientInit;
|
||||
import com.metamx.metrics.JvmMonitor;
|
||||
import com.metamx.metrics.Monitor;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import com.metamx.metrics.MonitorSchedulerConfig;
|
||||
import com.metamx.metrics.SysMonitor;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
|
||||
/**
|
||||
*/
|
||||
@Deprecated
|
||||
public class ServerMain
|
||||
{
|
||||
private static final Logger log = new Logger(ServerMain.class);
|
||||
|
||||
public static void main(String[] args) throws Exception
|
||||
{
|
||||
LogLevelAdjuster.register();
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final ObjectMapper smileMapper = new DefaultObjectMapper(new SmileFactory());
|
||||
smileMapper.getJsonFactory().setCodec(smileMapper);
|
||||
|
||||
final Properties props = Initialization.loadProperties();
|
||||
final ConfigurationObjectFactory configFactory = Config.createFactory(props);
|
||||
final Lifecycle lifecycle = new Lifecycle();
|
||||
|
||||
final HttpClient httpClient = HttpClientInit.createClient(
|
||||
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
|
||||
);
|
||||
|
||||
final ServiceEmitter emitter = new ServiceEmitter(
|
||||
props.getProperty("druid.service"),
|
||||
props.getProperty("druid.host"),
|
||||
Emitters.create(props, httpClient, jsonMapper, lifecycle)
|
||||
);
|
||||
|
||||
final ExecutorService executorService = ExecutorServices.create(
|
||||
lifecycle,
|
||||
configFactory.buildWithReplacements(
|
||||
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
|
||||
)
|
||||
);
|
||||
|
||||
StupidPool<ByteBuffer> computationBufferPool = ServerInit.makeComputeScratchPool(
|
||||
Integer.parseInt(props.getProperty("druid.computation.buffer.size", String.valueOf(1024 * 1024 * 1024)))
|
||||
);
|
||||
|
||||
Map<Class<? extends Query>, QueryRunnerFactory> queryRunners = ServerInit.initDefaultQueryTypes(
|
||||
configFactory,
|
||||
computationBufferPool
|
||||
);
|
||||
|
||||
final RestS3Service s3Client = new RestS3Service(
|
||||
new AWSCredentials(props.getProperty("com.metamx.aws.accessKey"), props.getProperty("com.metamx.aws.secretKey"))
|
||||
);
|
||||
QueryableLoaderConfig queryableLoaderConfig = configFactory.build(QueryableLoaderConfig.class);
|
||||
final ServerManager serverManager = new ServerManager(
|
||||
ServerInit.makeDefaultQueryableLoader(s3Client, queryableLoaderConfig),
|
||||
new DefaultQueryRunnerFactoryConglomerate(queryRunners),
|
||||
emitter,
|
||||
executorService
|
||||
);
|
||||
|
||||
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
|
||||
|
||||
final DruidServer druidServer = new DruidServer(configFactory.build(DruidServerConfig.class));
|
||||
final PhoneBook coordinatorYp = Initialization.createYellowPages(
|
||||
jsonMapper,
|
||||
zkClient,
|
||||
"Coordinator-ZKYP--%s",
|
||||
lifecycle
|
||||
);
|
||||
final ZkCoordinator coordinator = new ZkCoordinator(
|
||||
jsonMapper,
|
||||
configFactory.build(ZkCoordinatorConfig.class),
|
||||
druidServer,
|
||||
coordinatorYp,
|
||||
serverManager,
|
||||
emitter
|
||||
);
|
||||
lifecycle.addManagedInstance(coordinator);
|
||||
|
||||
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
|
||||
|
||||
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
|
||||
final List<Monitor> monitors = Lists.<Monitor>newArrayList(
|
||||
new ServerMonitor(druidServer, serverManager),
|
||||
new JvmMonitor()
|
||||
);
|
||||
if (Boolean.parseBoolean(props.getProperty("druid.monitoring.monitorSystem", "true"))) {
|
||||
monitors.add(new SysMonitor());
|
||||
}
|
||||
|
||||
final MonitorScheduler healthMonitor = new MonitorScheduler(
|
||||
configFactory.build(MonitorSchedulerConfig.class),
|
||||
globalScheduledExec,
|
||||
emitter,
|
||||
monitors
|
||||
);
|
||||
lifecycle.addManagedInstance(healthMonitor);
|
||||
|
||||
final RequestLogger requestLogger = Initialization.makeRequestLogger(
|
||||
scheduledExecutorFactory.create(
|
||||
1,
|
||||
"RequestLogger--%d"
|
||||
),
|
||||
props
|
||||
);
|
||||
lifecycle.addManagedInstance(requestLogger);
|
||||
|
||||
try {
|
||||
lifecycle.start();
|
||||
}
|
||||
catch (Throwable t) {
|
||||
log.error(t, "Error when starting up. Failing.");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(
|
||||
new Thread(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
log.info("Running shutdown hook");
|
||||
lifecycle.stop();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
final Server server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
|
||||
final Context root = new Context(server, "/", Context.SESSIONS);
|
||||
|
||||
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
|
||||
root.addServlet(
|
||||
new ServletHolder(new QueryServlet(jsonMapper, smileMapper, serverManager, emitter, requestLogger)),
|
||||
"/*"
|
||||
);
|
||||
|
||||
|
||||
server.start();
|
||||
server.join();
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
|
||||
System.out.println("K thx bye.");
|
||||
ComputeMain.main(args);
|
||||
}
|
||||
}
|
||||
|
@ -53,7 +53,6 @@ import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.SegmentInventoryManager;
|
||||
import com.metamx.druid.client.ServerInventoryManager;
|
||||
import com.metamx.druid.coordination.DruidClusterInfo;
|
||||
import com.metamx.druid.coordination.legacy.TheSizeAdjuster;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
@ -78,7 +77,6 @@ public class DruidMaster
|
||||
private final DruidClusterInfo clusterInfo;
|
||||
private final DatabaseSegmentManager databaseSegmentManager;
|
||||
private final ServerInventoryManager serverInventoryManager;
|
||||
private final TheSizeAdjuster sizeAdjuster;
|
||||
private final PhoneBook yp;
|
||||
private final ServiceEmitter emitter;
|
||||
private final ScheduledExecutorService exec;
|
||||
@ -98,7 +96,6 @@ public class DruidMaster
|
||||
ObjectMapper jsonMapper,
|
||||
DatabaseSegmentManager databaseSegmentManager,
|
||||
ServerInventoryManager serverInventoryManager,
|
||||
TheSizeAdjuster sizeAdjuster,
|
||||
PhoneBook zkPhoneBook,
|
||||
ServiceEmitter emitter,
|
||||
ScheduledExecutorFactory scheduledExecutorFactory,
|
||||
@ -114,7 +111,6 @@ public class DruidMaster
|
||||
|
||||
this.databaseSegmentManager = databaseSegmentManager;
|
||||
this.serverInventoryManager = serverInventoryManager;
|
||||
this.sizeAdjuster = sizeAdjuster;
|
||||
this.yp = zkPhoneBook;
|
||||
this.emitter = emitter;
|
||||
|
||||
@ -355,16 +351,7 @@ public class DruidMaster
|
||||
|
||||
for (DataSegment dataSegment : dataSegments) {
|
||||
if (dataSegment.getSize() < 0) {
|
||||
log.info("No size on Segment[%s], setting.", dataSegment);
|
||||
|
||||
DataSegment newDataSegment = sizeAdjuster.updateDescriptor(dataSegment);
|
||||
|
||||
if (newDataSegment == null) {
|
||||
log.warn("newDataSegment was null with old dataSegment[%s]. Skipping.", dataSegment);
|
||||
continue;
|
||||
}
|
||||
|
||||
dataSegment = newDataSegment;
|
||||
log.warn("No size on Segment[%s], wtf?", dataSegment);
|
||||
}
|
||||
availableSegments.add(dataSegment);
|
||||
}
|
||||
|
@ -33,7 +33,6 @@ import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.ServerInventoryManager;
|
||||
import com.metamx.druid.client.ZKPhoneBook;
|
||||
import com.metamx.druid.coordination.legacy.TheSizeAdjuster;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.metrics.NoopServiceEmitter;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
@ -48,7 +47,6 @@ public class DruidMasterTest
|
||||
private PhoneBook yp;
|
||||
private DatabaseSegmentManager databaseSegmentManager;
|
||||
private ServerInventoryManager serverInventoryManager;
|
||||
private TheSizeAdjuster theSizeAdjuster;
|
||||
private ScheduledExecutorFactory scheduledExecutorFactory;
|
||||
private DruidServer druidServer;
|
||||
private DataSegment segment;
|
||||
@ -70,9 +68,6 @@ public class DruidMasterTest
|
||||
databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class);
|
||||
EasyMock.replay(databaseSegmentManager);
|
||||
|
||||
theSizeAdjuster = EasyMock.createNiceMock(TheSizeAdjuster.class);
|
||||
EasyMock.replay(theSizeAdjuster);
|
||||
|
||||
scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
|
||||
EasyMock.replay(scheduledExecutorFactory);
|
||||
|
||||
@ -144,7 +139,6 @@ public class DruidMasterTest
|
||||
null,
|
||||
databaseSegmentManager,
|
||||
serverInventoryManager,
|
||||
theSizeAdjuster,
|
||||
yp,
|
||||
new NoopServiceEmitter(),
|
||||
scheduledExecutorFactory,
|
||||
|
Loading…
x
Reference in New Issue
Block a user