1) Introduce Jetty 8

2) Fix up modules based on exceptions at startup for MasterMain
This commit is contained in:
cheddar 2013-06-17 10:53:50 -07:00
parent 06f7e7e665
commit 11ea15fc1a
51 changed files with 948 additions and 556 deletions

View File

@ -86,12 +86,16 @@
<artifactId>curator-x-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>

View File

@ -43,6 +43,7 @@ import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.guice.JsonConfigurator;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
@ -60,10 +61,11 @@ import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;
import org.joda.time.Duration;
import org.mortbay.jetty.Server;
import org.skife.config.ConfigurationObjectFactory;
import javax.validation.Validation;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
@ -83,6 +85,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private final String nodeType;
private final JsonConfigurator jsonConfigurator;
private DruidServerMetadata druidServerMetadata = null;
private ServiceEmitter emitter = null;
@ -124,6 +127,8 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
Preconditions.checkNotNull(smileMapper, "smileMapper");
Preconditions.checkNotNull(configFactory, "configFactory");
this.jsonConfigurator = new JsonConfigurator(jsonMapper, Validation.buildDefaultValidatorFactory().getValidator());
Preconditions.checkState(smileMapper.getJsonFactory() instanceof SmileFactory, "smileMapper should use smile.");
this.nodeType = nodeType;
}
@ -245,6 +250,11 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
return configFactory;
}
public JsonConfigurator getJsonConfigurator()
{
return jsonConfigurator;
}
public DruidServerMetadata getDruidServerMetadata()
{
initializeDruidServerMetadata();

View File

@ -1,5 +1,6 @@
package com.metamx.druid.curator.discovery;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
@ -26,18 +27,18 @@ public class DiscoveryModule implements Module
@Provides @LazySingleton
public ServiceDiscovery<Void> getServiceDiscovery(
CuratorFramework curator,
CuratorDiscoveryConfig config,
Supplier<CuratorDiscoveryConfig> config,
Lifecycle lifecycle
) throws Exception
{
return Initialization.makeServiceDiscoveryClient(curator, config, lifecycle);
return Initialization.makeServiceDiscoveryClient(curator, config.get(), lifecycle);
}
@Provides @LazySingleton
public ServiceInstanceFactory<Void> getServiceInstanceFactory(
DruidNodeConfig nodeConfig
Supplier<DruidNodeConfig> nodeConfig
)
{
return Initialization.makeServiceInstanceFactory(nodeConfig);
return Initialization.makeServiceInstanceFactory(nodeConfig.get());
}
}

View File

@ -57,9 +57,9 @@ import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.Monitor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.servlet.GzipFilter;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
@ -210,19 +210,22 @@ public class BrokerNode extends QueryableNode<BrokerNode>
theModules.addAll(extraModules);
final Injector injector = Guice.createInjector(theModules);
final Context root = new Context(getServer(), "/", Context.SESSIONS);
final ServletContextHandler root = new ServletContextHandler();
root.setContextPath("/");
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(new QueryServlet(getJsonMapper(), getSmileMapper(), texasRanger, getEmitter(), getRequestLogger())),
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), texasRanger, getEmitter(), getRequestLogger())
),
"/druid/v2/*"
);
root.addFilter(GzipFilter.class, "/*", 0);
root.addFilter(GzipFilter.class, "/*", null);
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", 0);
root.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null);
for (String path : pathsForGuiceFilter) {
root.addFilter(GuiceFilter.class, path, 0);
root.addFilter(GuiceFilter.class, path, null);
}
}

View File

@ -33,9 +33,8 @@ import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.eclipse.jetty.server.Request;
import org.joda.time.DateTime;
import org.mortbay.jetty.Request;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;

View File

@ -23,19 +23,23 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.config.Config;
import com.metamx.druid.guice.JsonConfigurator;
import com.metamx.druid.guice.LazySingleton;
import org.skife.config.ConfigurationObjectFactory;
import javax.validation.Validation;
import javax.validation.Validator;
import java.util.Properties;
/**
*/
public class ConfigFactoryModule implements Module
public class ConfigModule implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
}
@Provides @LazySingleton

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public abstract class CuratorDiscoveryConfig
public class CuratorDiscoveryConfig
{
@JsonProperty
private String path = null;

View File

@ -29,7 +29,7 @@ import javax.validation.constraints.NotNull;
/**
*/
public abstract class DruidNodeConfig
public class DruidNodeConfig
{
@NotNull
private String serviceName = null;
@ -42,8 +42,8 @@ public abstract class DruidNodeConfig
@JsonCreator
public DruidNodeConfig(
@JsonProperty("service") String serviceName,
@JsonProperty("host") String host,
@JsonProperty("service") String serviceName,
@JsonProperty("port") Integer port
)
{

View File

@ -19,6 +19,7 @@
package com.metamx.druid.initialization;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Binding;
@ -46,6 +47,7 @@ import java.util.Properties;
public class EmitterModule implements Module
{
private static final Logger log = new Logger(EmitterModule.class);
private static final String EMITTER_PROPERTY = "druid.emitter";
private final Properties props;
@ -60,7 +62,7 @@ public class EmitterModule implements Module
@Override
public void configure(Binder binder)
{
String emitterType = props.getProperty("druid.emitter", "");
String emitterType = props.getProperty(EMITTER_PROPERTY, "");
binder.install(new LogEmitterModule());
binder.install(new HttpEmitterModule());
@ -70,8 +72,9 @@ public class EmitterModule implements Module
@Provides
@LazySingleton
public ServiceEmitter getServiceEmitter(DruidNodeConfig config, Emitter emitter)
public ServiceEmitter getServiceEmitter(Supplier<DruidNodeConfig> configSupplier, Emitter emitter)
{
final DruidNodeConfig config = configSupplier.get();
final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHost(), emitter);
EmittingLogger.registerEmitter(retVal);
return retVal;
@ -110,7 +113,7 @@ public class EmitterModule implements Module
knownTypes.add(((Named) annotation).value());
}
}
throw new ISE("Uknown emitter type, known types[%s]", knownTypes);
throw new ISE("Uknown emitter type[%s]=[%s], known types[%s]", EMITTER_PROPERTY, emitterType, knownTypes);
}
}

View File

@ -20,16 +20,16 @@
package com.metamx.druid.initialization;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import org.joda.time.Period;
/**
*/
public abstract class HttpEmitterConfig extends com.metamx.emitter.core.HttpEmitterConfig
public class HttpEmitterConfig extends com.metamx.emitter.core.HttpEmitterConfig
{
@JsonProperty
private Duration timeOut = new Duration("PT5m");
private Period timeOut = new Period("PT5M");
public Duration getReadTimeout()
public Period getReadTimeout()
{
return timeOut;
}

View File

@ -24,6 +24,7 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import com.google.inject.util.Providers;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.LazySingleton;
@ -43,6 +44,9 @@ public class HttpEmitterModule implements Module
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter.http", HttpEmitterConfig.class);
// Fix the injection of this if we want to enable ssl emission of events.
binder.bind(SSLContext.class).toProvider(Providers.<SSLContext>of(null)).in(LazySingleton.class);
}
@Provides @LazySingleton @Named("http")
@ -51,7 +55,7 @@ public class HttpEmitterModule implements Module
final HttpClientConfig.Builder builder = HttpClientConfig
.builder()
.withNumConnections(1)
.withReadTimeout(config.get().getReadTimeout());
.withReadTimeout(config.get().getReadTimeout().toStandardDuration());
if (sslContext != null) {
builder.withSslContext(sslContext);

View File

@ -21,8 +21,14 @@ package com.metamx.druid.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
@ -33,9 +39,12 @@ import com.metamx.druid.curator.discovery.AddressPortServiceInstanceFactory;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.guice.DruidGuiceExtensions;
import com.metamx.druid.guice.DruidSecondaryModule;
import com.metamx.druid.http.EmittingRequestLogger;
import com.metamx.druid.http.FileRequestLogger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.jackson.JacksonModule;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.core.Emitter;
import org.apache.curator.framework.CuratorFramework;
@ -45,17 +54,20 @@ import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.zookeeper.data.Stat;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.thread.QueuedThreadPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.skife.config.ConfigurationObjectFactory;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
@ -370,4 +382,46 @@ public class Initialization
return new AddressPortServiceInstanceFactory(address, config.getPort());
}
public static Injector makeInjector(final Object... modules)
{
final Injector baseInjector = Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new ConfigModule()
);
List<Object> actualModules = Lists.newArrayList();
actualModules.add(DruidSecondaryModule.class);
actualModules.addAll(Arrays.asList(modules));
return Guice.createInjector(
Lists.transform(
actualModules,
new Function<Object, Module>()
{
@Override
@SuppressWarnings("unchecked")
public Module apply(@Nullable Object input)
{
if (input instanceof Module) {
baseInjector.injectMembers(input);
return (Module) input;
}
if (input instanceof Class) {
if (Module.class.isAssignableFrom((Class) input)) {
return baseInjector.getInstance((Class<? extends Module>) input);
}
else {
throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class);
}
}
throw new ISE("Unknown module type[%s]", input.getClass());
}
}
)
);
}
}

View File

@ -0,0 +1,11 @@
package com.metamx.druid.initialization;
import com.google.inject.Injector;
import org.eclipse.jetty.server.Server;
/**
*/
public interface JettyServerInitializer
{
public void initialize(Server server, Injector injector);
}

View File

@ -0,0 +1,62 @@
package com.metamx.druid.initialization;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.guice.ConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import org.eclipse.jetty.server.Server;
/**
*/
public class JettyServerModule implements Module
{
private static final Logger log = new Logger(JettyServerModule.class);
private final JettyServerInitializer initializer;
public JettyServerModule(
JettyServerInitializer initializer
)
{
this.initializer = initializer;
}
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(binder, ServerConfig.class);
}
@Provides @LazySingleton
public Server getServer(Injector injector, Lifecycle lifecycle, ServerConfig config)
{
final Server server = Initialization.makeJettyServer(config);
initializer.initialize(server, injector);
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
server.start();
}
@Override
public void stop()
{
try {
server.stop();
}
catch (Exception e) {
log.warn(e, "Unable to stop Jetty server.");
}
}
}
);
return server;
}
}

View File

@ -67,6 +67,10 @@
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>

View File

@ -1,5 +1,6 @@
package com.metamx.druid.config;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
@ -7,6 +8,7 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.db.DbTablesConfig;
import org.joda.time.Duration;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
@ -34,7 +36,7 @@ public class ConfigManager
private boolean started = false;
private final IDBI dbi;
private final ConfigManagerConfig config;
private final Supplier<ConfigManagerConfig> config;
private final ScheduledExecutorService exec;
private final ConcurrentMap<String, ConfigHolder> watchedConfigs;
@ -44,17 +46,20 @@ public class ConfigManager
private volatile ConfigManager.PollingCallable poller;
@Inject
public ConfigManager(IDBI dbi, ConfigManagerConfig config) // TODO: use DbTables and a different config
public ConfigManager(IDBI dbi, Supplier<DbTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
{
this.dbi = dbi;
this.config = config;
this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
this.watchedConfigs = Maps.newConcurrentMap();
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", config.getConfigTable());
final String configTable = dbTables.get().getConfigTable();
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable);
insertStatement = String.format(
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
config.getConfigTable()
configTable
);
}
@ -67,7 +72,9 @@ public class ConfigManager
}
poller = new PollingCallable();
ScheduledExecutors.scheduleWithFixedDelay(exec, new Duration(0), config.getPollDuration(), poller);
ScheduledExecutors.scheduleWithFixedDelay(
exec, new Duration(0), config.get().getPollDuration().toStandardDuration(), poller
);
started = true;
}

View File

@ -1,18 +1,20 @@
package com.metamx.druid.config;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
import javax.validation.constraints.NotNull;
/**
*/
public abstract class ConfigManagerConfig
public class ConfigManagerConfig
{
@Config("druid.database.configTable")
public abstract String getConfigTable();
@Config("druid.indexer.poll.duration")
@Default("PT1M")
public abstract Duration getPollDuration();
@JsonProperty
@NotNull
private Period pollDuration = new Period("PT1M");
public Period getPollDuration()
{
return pollDuration;
}
}

View File

@ -0,0 +1,74 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.config;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbTablesConfig;
/**
*/
public class ConfigManagerProvider implements Provider<ConfigManager>
{
private final DbConnector dbConnector;
private final Supplier<DbTablesConfig> dbTables;
private final Supplier<ConfigManagerConfig> config;
private final Lifecycle lifecycle;
@Inject
ConfigManagerProvider(
DbConnector dbConnector,
Supplier<DbTablesConfig> dbTables,
Supplier<ConfigManagerConfig> config,
Lifecycle lifecycle
)
{
this.dbConnector = dbConnector;
this.dbTables = dbTables;
this.config = config;
this.lifecycle = lifecycle;
}
@Override
public ConfigManager get()
{
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
dbConnector.createConfigTable();
}
@Override
public void stop()
{
}
}
);
return new ConfigManager(dbConnector.getDBI(), dbTables, config);
}
}

View File

@ -20,6 +20,7 @@
package com.metamx.druid.db;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.DBI;
@ -165,6 +166,7 @@ public class DbConnector
private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi;
@Inject
public DbConnector(Supplier<DbConnectorConfig> config, Supplier<DbTablesConfig> dbTables)
{
this.config = config;
@ -197,11 +199,32 @@ public class DbConnector
public void createSegmentTable()
{
if (config.get().isCreateTables()) {
createSegmentTable(dbi, dbTables.get().getSegmentsTable());
}
}
public void createRulesTable()
{
if (config.get().isCreateTables()) {
createRuleTable(dbi, dbTables.get().getRulesTable());
}
}
public void createConfigTable()
{
if (config.get().isCreateTables()) {
createRuleTable(dbi, dbTables.get().getConfigTable());
}
}
public void createTaskTables()
{
if (config.get().isCreateTables()) {
final DbTablesConfig dbTablesConfig = dbTables.get();
createTaskTable(dbi, dbTablesConfig.getTasksTable());
createTaskLogTable(dbi, dbTablesConfig.getTaskLogTable());
createTaskLockTable(dbi, dbTablesConfig.getTaskLockTable());
}
}
}

View File

@ -25,7 +25,7 @@ import javax.validation.constraints.NotNull;
/**
*/
public abstract class DbConnectorConfig
public class DbConnectorConfig
{
@JsonProperty
private boolean createTables = true;

View File

@ -12,7 +12,7 @@ public class DbTablesConfig
{
public static DbTablesConfig fromBase(String base)
{
return new DbTablesConfig(base, null, null);
return new DbTablesConfig(base, null, null, null, null, null, null);
}
@NotNull
@ -22,19 +22,38 @@ public class DbTablesConfig
private final String segmentsTable;
@NotNull
private final String ruleTable;
private final String rulesTable;
@NotNull
private final String configTable;
@NotNull
private final String tasksTable;
@NotNull
private final String taskLogTable;
@NotNull
private final String taskLockTable;
@JsonCreator
public DbTablesConfig(
@JsonProperty("base") String base,
@JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable
@JsonProperty("rules") String rulesTable,
@JsonProperty("config") String configTable,
@JsonProperty("tasks") String tasksTable,
@JsonProperty("taskLog") String taskLogTable,
@JsonProperty("taskLock") String taskLockTable
)
{
this.base = base;
this.segmentsTable = makeTableName(segmentsTable, "segments");
this.ruleTable = makeTableName(rulesTable, "rules");
this.rulesTable = makeTableName(rulesTable, "rules");
this.configTable = makeTableName(configTable, "config");
this.tasksTable = makeTableName(tasksTable, "tasks");
this.taskLogTable = makeTableName(taskLogTable, "task_log");
this.taskLockTable = makeTableName(taskLockTable, "task_lock");
}
private String makeTableName(String explicitTableName, String defaultSuffix)
@ -64,6 +83,30 @@ public class DbTablesConfig
@JsonProperty("rules")
public String getRulesTable()
{
return ruleTable;
return rulesTable;
}
@JsonProperty("config")
public String getConfigTable()
{
return configTable;
}
@JsonProperty("tasks")
public String getTasksTable()
{
return tasksTable;
}
@JsonProperty("taskLog")
public String getTaskLogTable()
{
return taskLogTable;
}
@JsonProperty("taskLock")
public String getTaskLockTable()
{
return taskLockTable;
}
}

View File

@ -4,15 +4,18 @@ import com.google.common.base.Preconditions;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.metamx.common.logger.Logger;
import org.skife.config.ConfigurationObjectFactory;
/**
*/
public class ConfigProvider<T> implements Provider<T>
{
private static final Logger log = new Logger(ConfigProvider.class);
public static <T> void bind(Binder binder, Class<T> clazz)
{
binder.bind(clazz).toProvider(of(clazz)).in(DruidScopes.SINGLETON);
binder.bind(clazz).toProvider(of(clazz)).in(LazySingleton.class);
}
public static <T> Provider<T> of(Class<T> clazz)
@ -34,8 +37,14 @@ public class ConfigProvider<T> implements Provider<T>
@Inject
public void inject(ConfigurationObjectFactory factory)
{
try {
object = factory.build(clazz);
}
catch (IllegalArgumentException e) {
log.info("Unable to build instance of class[%s]", clazz);
throw e;
}
}
@Override
public T get()

View File

@ -8,6 +8,7 @@ import com.metamx.druid.jackson.Json;
import com.metamx.druid.jackson.Smile;
import org.skife.config.ConfigurationObjectFactory;
import javax.validation.Validator;
import java.util.Properties;
/**
@ -18,19 +19,25 @@ public class DruidSecondaryModule implements Module
private final ConfigurationObjectFactory factory;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final Validator validator;
private final JsonConfigurator jsonConfigurator;
@Inject
public DruidSecondaryModule(
Properties properties,
ConfigurationObjectFactory factory,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper
@Smile ObjectMapper smileMapper,
Validator validator,
JsonConfigurator jsonConfigurator
)
{
this.properties = properties;
this.factory = factory;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.validator = validator;
this.jsonConfigurator = jsonConfigurator;
}
@Override
@ -41,5 +48,7 @@ public class DruidSecondaryModule implements Module
binder.bind(ConfigurationObjectFactory.class).toInstance(factory);
binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMapper);
binder.bind(ObjectMapper.class).annotatedWith(Smile.class).toInstance(smileMapper);
binder.bind(Validator.class).toInstance(validator);
binder.bind(JsonConfigurator.class).toInstance(jsonConfigurator);
}
}

View File

@ -19,36 +19,26 @@
package com.metamx.druid.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.base.Suppliers;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.google.inject.util.Types;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
*/
public class JsonConfigProvider<T> implements Provider<Supplier<T>>
{
private static final Joiner JOINER = Joiner.on(", ");
@SuppressWarnings("unchecked")
public static <T> void bind(Binder binder, String propertyBase, Class<T> classToProvide)
{
binder.bind(new TypeLiteral<Supplier<T>>(){}).toProvider(of(propertyBase, classToProvide)).in(DruidScopes.SINGLETON);
binder.bind(Key.get(Types.newParameterizedType(Supplier.class, classToProvide)))
.toProvider((Provider) of(propertyBase, classToProvide))
.in(LazySingleton.class);
}
public static <T> JsonConfigProvider<T> of(String propertyBase, Class<T> classToProvide)
@ -59,7 +49,8 @@ public class JsonConfigProvider<T> implements Provider<Supplier<T>>
private final String propertyBase;
private final Class<T> classToProvide;
private Supplier<T> supplier;
private Properties props;
private JsonConfigurator configurator;
public JsonConfigProvider(
String propertyBase,
@ -73,49 +64,17 @@ public class JsonConfigProvider<T> implements Provider<Supplier<T>>
@Inject
public void inject(
Properties props,
ObjectMapper jsonMapper,
Validator validator
JsonConfigurator configurator
)
{
Map<String, Object> jsonMap = Maps.newHashMap();
for (String prop : props.stringPropertyNames()) {
if (prop.startsWith(propertyBase)) {
final String propValue = props.getProperty(prop);
try {
jsonMap.put(prop.substring(propertyBase.length()), jsonMapper.readValue(propValue, Object.class));
}
catch (IOException e) {
throw new IAE("Unable to parse an object out of prop[%s]=[%s]", prop, propValue);
}
}
}
final T config = jsonMapper.convertValue(jsonMap, classToProvide);
final Set<ConstraintViolation<T>> violations = validator.validate(config);
if (!violations.isEmpty()) {
List<String> messages = Lists.newArrayList();
for (ConstraintViolation<T> violation : violations) {
messages.add(String.format("%s - %s", violation.getPropertyPath().toString(), violation.getMessage()));
}
throw new ISE("Configuration violations[%s]", JOINER.join(messages));
}
this.supplier = new Supplier<T>()
{
@Override
public T get()
{
return config;
}
};
this.props = props;
this.configurator = configurator;
}
@Override
public Supplier<T> get()
{
return supplier;
final T config = configurator.configurate(props, propertyBase, classToProvide);
return Suppliers.ofInstance(config);
}
}

View File

@ -0,0 +1,94 @@
package com.metamx.druid.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.google.inject.ProvisionException;
import com.google.inject.spi.Message;
import com.metamx.common.logger.Logger;
import javax.annotation.Nullable;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
*/
public class JsonConfigurator
{
private static final Logger log = new Logger(JsonConfigurator.class);
private static final Joiner JOINER = Joiner.on("; ");
private final ObjectMapper jsonMapper;
private final Validator validator;
@Inject
public JsonConfigurator(
ObjectMapper jsonMapper,
Validator validator
)
{
this.jsonMapper = jsonMapper;
this.validator = validator;
}
public <T> T configurate(Properties props, String propertyPrefix, Class<T> clazz) throws ProvisionException
{
// Make it end with a period so we only include properties with sub-object thingies.
final String propertyBase = propertyPrefix.endsWith(".") ? propertyPrefix : propertyPrefix + ".";
Map<String, Object> jsonMap = Maps.newHashMap();
for (String prop : props.stringPropertyNames()) {
if (prop.startsWith(propertyBase)) {
final String propValue = props.getProperty(prop);
Object value;
try {
value = jsonMapper.readValue(propValue, Object.class);
}
catch (IOException e) {
log.debug("Unable to parse [%s]=[%s] as a json object, using as is.", prop, propValue);
value = propValue;
}
jsonMap.put(prop.substring(propertyBase.length()), value);
}
}
final T config = jsonMapper.convertValue(jsonMap, clazz);
final Set<ConstraintViolation<T>> violations = validator.validate(config);
if (!violations.isEmpty()) {
List<String> messages = Lists.newArrayList();
for (ConstraintViolation<T> violation : violations) {
messages.add(String.format("%s - %s", violation.getPropertyPath().toString(), violation.getMessage()));
}
throw new ProvisionException(
Iterables.transform(
messages,
new Function<String, Message>()
{
@Nullable
@Override
public Message apply(@Nullable String input)
{
return new Message(String.format("%s%s", propertyBase, input));
}
}
)
);
}
return config;
}
}

View File

@ -29,99 +29,6 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
</dependency>
<dependency>
<groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
@ -137,18 +44,6 @@
<artifactId>twitter4j-stream</artifactId>
<version>2.2.6</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -145,8 +145,8 @@
<artifactId>jets3t</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
@ -51,6 +52,7 @@ import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.db.DbTablesConfig;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
@ -115,18 +117,17 @@ import com.metamx.metrics.SysMonitor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.resource.ResourceCollection;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.joda.time.Duration;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.FilterHolder;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.resource.ResourceCollection;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.IDBI;
import java.io.IOException;
import java.io.InputStream;
@ -153,7 +154,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private List<Monitor> monitors = null;
private ServiceEmitter emitter = null;
private DbConnectorConfig dbConnectorConfig = null;
private IDBI dbi = null;
private DbConnector dbi = null;
private Supplier<DbTablesConfig> dbTables = null;
private IndexerCoordinatorConfig config = null;
private MergerDBCoordinator mergerDBCoordinator = null;
private ServiceDiscovery<Void> serviceDiscovery = null;
@ -238,19 +240,30 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
return this;
}
public Supplier<DbTablesConfig> getDbTables()
{
if (dbTables == null) {
dbTables = Suppliers.ofInstance(
getJsonConfigurator().configurate(getProps(), "druid.database.tables", DbTablesConfig.class)
);
}
return dbTables;
}
public void doInit() throws Exception
{
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(getLifecycle());
initializeDB();
final ConfigManagerConfig managerConfig = getConfigFactory().build(ConfigManagerConfig.class);
DbConnector.createConfigTable(dbi, managerConfig.getConfigTable());
dbi.createConfigTable();
JacksonConfigManager configManager =
new JacksonConfigManager(
getLifecycle().addManagedInstance(
new ConfigManager(
dbi,
managerConfig
dbi.getDBI(),
getDbTables(),
Suppliers.ofInstance(managerConfig)
)
), getJsonMapper()
);
@ -296,7 +309,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
)
);
final Context staticContext = new Context(server, "/static", Context.SESSIONS);
final ServletContextHandler staticContext = new ServletContextHandler(server, "/static", ServletContextHandler.SESSIONS);
staticContext.addServlet(new ServletHolder(new DefaultServlet()), "/*");
ResourceCollection resourceCollection = new ResourceCollection(
@ -309,7 +322,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
// If we want to support querying tasks (e.g. for realtime in local mode), we need a QueryServlet here.
final Context root = new Context(server, "/", Context.SESSIONS);
final ServletContextHandler root = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/druid/*");
root.addServlet(new ServletHolder(new DefaultServlet()), "/mmx/*"); // backwards compatibility
@ -343,10 +356,10 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
}
}
)
), "/*", 0
), "/*", null
);
root.addFilter(GuiceFilter.class, "/druid/indexer/v1/*", 0);
root.addFilter(GuiceFilter.class, "/mmx/merger/v1/*", 0); //backwards compatibility, soon to be removed
root.addFilter(GuiceFilter.class, "/druid/indexer/v1/*", null);
root.addFilter(GuiceFilter.class, "/mmx/merger/v1/*", null); //backwards compatibility, soon to be removed
initialized = true;
}
@ -533,7 +546,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
dbConnectorConfig = getConfigFactory().build(DbConnectorConfig.class);
}
if (dbi == null) {
dbi = new DbConnector(Suppliers.ofInstance(dbConnectorConfig), null).getDBI(); // TODO
dbi = new DbConnector(Suppliers.ofInstance(dbConnectorConfig), null); // TODO
}
}
@ -560,8 +573,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
mergerDBCoordinator = new MergerDBCoordinator(
getJsonMapper(),
dbConnectorConfig,
null, // TODO
dbi
getDbTables().get(),
dbi.getDBI()
);
}
}
@ -610,15 +623,9 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
taskStorage = new HeapMemoryTaskStorage();
} else if (config.getStorageImpl().equals("db")) {
final IndexerDbConnectorConfig dbConnectorConfig = getConfigFactory().build(IndexerDbConnectorConfig.class);
DbConnector.createTaskTable(dbi, dbConnectorConfig.getTaskTable());
DbConnector.createTaskLogTable(dbi, dbConnectorConfig.getTaskLogTable());
DbConnector.createTaskLockTable(dbi, dbConnectorConfig.getTaskLockTable());
dbi.createTaskTables();
taskStorage = new DbTaskStorage(
getJsonMapper(),
dbConnectorConfig,
new DbConnector(Suppliers.<DbConnectorConfig>ofInstance(dbConnectorConfig), null).getDBI() // TODO
);
taskStorage = new DbTaskStorage(getJsonMapper(), dbConnectorConfig, dbi.getDBI());
} else {
throw new ISE("Invalid storage implementation: %s", config.getStorageImpl());
}

View File

@ -41,10 +41,8 @@ import com.metamx.druid.indexing.coordinator.TaskQueue;
import com.metamx.druid.indexing.coordinator.TaskRunner;
import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.emitter.service.ServiceEmitter;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
@ -89,8 +87,6 @@ public class IndexerCoordinatorResource
}
};
private final IndexerCoordinatorConfig config;
private final ServiceEmitter emitter;
private final TaskMasterLifecycle taskMasterLifecycle;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogProvider taskLogProvider;
@ -101,8 +97,6 @@ public class IndexerCoordinatorResource
@Inject
public IndexerCoordinatorResource(
IndexerCoordinatorConfig config,
ServiceEmitter emitter,
TaskMasterLifecycle taskMasterLifecycle,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogProvider taskLogProvider,
@ -110,8 +104,6 @@ public class IndexerCoordinatorResource
ObjectMapper jsonMapper
) throws Exception
{
this.config = config;
this.emitter = emitter;
this.taskMasterLifecycle = taskMasterLifecycle;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskLogProvider = taskLogProvider;

View File

@ -6,8 +6,6 @@ import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle;
import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.emitter.service.ServiceEmitter;
import javax.ws.rs.Path;
@ -19,8 +17,6 @@ public class OldIndexerCoordinatorResource extends IndexerCoordinatorResource
{
@Inject
public OldIndexerCoordinatorResource(
IndexerCoordinatorConfig config,
ServiceEmitter emitter,
TaskMasterLifecycle taskMasterLifecycle,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogProvider taskLogProvider,
@ -28,6 +24,6 @@ public class OldIndexerCoordinatorResource extends IndexerCoordinatorResource
ObjectMapper jsonMapper
) throws Exception
{
super(config, emitter, taskMasterLifecycle, taskStorageQueryAdapter, taskLogProvider, configManager, jsonMapper);
super(taskMasterLifecycle, taskStorageQueryAdapter, taskLogProvider, configManager, jsonMapper);
}
}

View File

@ -77,13 +77,13 @@ import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceProvider;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
@ -216,12 +216,12 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
chatHandlerProvider
)
);
final Context root = new Context(server, "/", Context.SESSIONS);
final ServletContextHandler root = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", 0);
root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", null);
root.addServlet(
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger())

View File

@ -37,10 +37,6 @@ import com.metamx.druid.QueryableNode;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.indexing.common.config.IndexerZkConfig;
import com.metamx.druid.indexing.common.config.TaskLogConfig;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
@ -54,6 +50,10 @@ import com.metamx.druid.indexing.worker.Worker;
import com.metamx.druid.indexing.worker.WorkerCuratorCoordinator;
import com.metamx.druid.indexing.worker.WorkerTaskMonitor;
import com.metamx.druid.indexing.worker.config.WorkerConfig;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -70,14 +70,14 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceProvider;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.joda.time.Duration;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
@ -204,12 +204,12 @@ public class WorkerNode extends QueryableNode<WorkerNode>
forkingTaskRunner
)
);
final Context root = new Context(server, "/", Context.SESSIONS);
final ServletContextHandler root = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", 0);
root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", null);
}
@LifecycleStart

30
pom.xml
View File

@ -230,6 +230,11 @@
<artifactId>hibernate-validator</artifactId>
<version>5.0.1.Final</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>1.1.0.Final</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
@ -266,20 +271,19 @@
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>8.1.11.v20130520</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.26</version>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>8.1.11.v20130520</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<version>8.1.11.v20130520</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
@ -299,7 +303,7 @@
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>

View File

@ -98,8 +98,8 @@
<artifactId>jets3t</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>

View File

@ -48,8 +48,8 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.io.File;
@ -148,7 +148,7 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
startMonitoring(monitors);
final Context root = new Context(getServer(), "/", Context.SESSIONS);
final ServletContextHandler root = new ServletContextHandler(getServer(), "/", ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(

View File

@ -146,8 +146,8 @@
<artifactId>jets3t</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>

View File

@ -118,7 +118,7 @@ public class DatabaseRuleManager
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService exec;
private final DatabaseRuleManagerConfig config;
private final Supplier<DatabaseRuleManagerConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final IDBI dbi;
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
@ -130,7 +130,7 @@ public class DatabaseRuleManager
@Inject
public DatabaseRuleManager(
ObjectMapper jsonMapper,
DatabaseRuleManagerConfig config,
Supplier<DatabaseRuleManagerConfig> config,
Supplier<DbTablesConfig> dbTables,
IDBI dbi
)
@ -155,11 +155,11 @@ public class DatabaseRuleManager
return;
}
createDefaultRule(dbi, getRulesTable(), config.getDefaultTier(), jsonMapper);
createDefaultRule(dbi, getRulesTable(), config.get().getDefaultTier(), jsonMapper);
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
config.getRulesPollDuration(),
config.get().getPollDuration().toStandardDuration(),
new Runnable()
{
@Override
@ -267,8 +267,8 @@ public class DatabaseRuleManager
if (theRules.get(dataSource) != null) {
retVal.addAll(theRules.get(dataSource));
}
if (theRules.get(config.getDefaultTier()) != null) {
retVal.addAll(theRules.get(config.getDefaultTier()));
if (theRules.get(config.get().getDefaultTier()) != null) {
retVal.addAll(theRules.get(config.get().getDefaultTier()));
}
return retVal;
}

View File

@ -19,19 +19,26 @@
package com.metamx.druid.db;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
/**
*/
public abstract class DatabaseRuleManagerConfig
public class DatabaseRuleManagerConfig
{
@Config("druid.database.rules.defaultTier")
@Default("_default")
public abstract String getDefaultTier();
@JsonProperty
private String defaultTier = "_default";
@Config("druid.database.rules.poll.duration")
@Default("PT1M")
public abstract Duration getRulesPollDuration();
@JsonProperty
private Period pollDuration = new Period("PT1M");
public String getDefaultTier()
{
return defaultTier;
}
public Period getPollDuration()
{
return pollDuration;
}
}

View File

@ -0,0 +1,79 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.db;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.metamx.common.lifecycle.Lifecycle;
/**
*/
public class DatabaseRuleManagerProvider implements Provider<DatabaseRuleManager>
{
private final ObjectMapper jsonMapper;
private final Supplier<DatabaseRuleManagerConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final DbConnector dbConnector;
private final Lifecycle lifecycle;
@Inject
public DatabaseRuleManagerProvider(
ObjectMapper jsonMapper,
Supplier<DatabaseRuleManagerConfig> config,
Supplier<DbTablesConfig> dbTables,
DbConnector dbConnector,
Lifecycle lifecycle
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.dbTables = dbTables;
this.dbConnector = dbConnector;
this.lifecycle = lifecycle;
}
@Override
public DatabaseRuleManager get()
{
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
dbConnector.createRulesTable();
DatabaseRuleManager.createDefaultRule(
dbConnector.getDBI(), dbTables.get().getRulesTable(), config.get().getDefaultTier(), jsonMapper
);
}
@Override
public void stop()
{
}
}
);
return new DatabaseRuleManager(jsonMapper, config, dbTables, dbConnector.getDBI());
}
}

View File

@ -0,0 +1,81 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.db;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.metamx.common.lifecycle.Lifecycle;
/**
*/
public class DatabaseSegmentManagerProvider implements Provider<DatabaseSegmentManager>
{
private final ObjectMapper jsonMapper;
private final Supplier<DatabaseSegmentManagerConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final DbConnector dbConnector;
private final Lifecycle lifecycle;
@Inject
public DatabaseSegmentManagerProvider(
ObjectMapper jsonMapper,
Supplier<DatabaseSegmentManagerConfig> config,
Supplier<DbTablesConfig> dbTables,
DbConnector dbConnector,
Lifecycle lifecycle
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.dbTables = dbTables;
this.dbConnector = dbConnector;
this.lifecycle = lifecycle;
}
@Override
public DatabaseSegmentManager get()
{
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
dbConnector.createSegmentTable();
}
@Override
public void stop()
{
}
}
);
return new DatabaseSegmentManager(
jsonMapper,
config,
dbTables,
dbConnector.getDBI()
);
}
}

View File

@ -1,6 +1,7 @@
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
@ -26,7 +27,7 @@ public class HttpClientModule implements Module
JsonConfigProvider.bind(binder, "druid.global.http", DruidHttpClientConfig.class);
}
public abstract static class DruidHttpClientConfig
public static class DruidHttpClientConfig
{
@JsonProperty
@Min(0)
@ -47,8 +48,14 @@ public class HttpClientModule implements Module
}
@Provides @LazySingleton @Global
public HttpClient makeHttpClient(DruidHttpClientConfig config, Lifecycle lifecycle, @Nullable SSLContext sslContext)
public HttpClient makeHttpClient(
Supplier<DruidHttpClientConfig> configSupplier,
Lifecycle lifecycle,
@Nullable SSLContext sslContext
)
{
final DruidHttpClientConfig config = configSupplier.get();
final HttpClientConfig.Builder builder = HttpClientConfig
.builder()
.withNumConnections(config.getNumConnections())

View File

@ -1,9 +1,12 @@
package com.metamx.druid.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.indexing.IndexingService;
@ -11,11 +14,25 @@ import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
import com.metamx.druid.client.selector.DiscoverySelector;
import com.metamx.druid.client.selector.Server;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.ConfigManagerProvider;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseRuleManagerConfig;
import com.metamx.druid.db.DatabaseRuleManagerProvider;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.db.DatabaseSegmentManagerConfig;
import com.metamx.druid.db.DatabaseSegmentManagerProvider;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.db.DbTablesConfig;
import com.metamx.druid.http.MasterRedirectInfo;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.master.DruidMasterConfig;
import com.metamx.druid.master.LoadQueueTaskMaster;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceProvider;
import org.skife.jdbi.v2.IDBI;
@ -27,52 +44,58 @@ public class MasterModule implements Module
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(binder, DruidMasterConfig.class);
ConfigProvider.bind(binder, ZkPathsConfig.class);
ConfigProvider.bind(binder, ServerInventoryViewConfig.class);
ConfigProvider.bind(binder, DbConnectorConfig.class);
JsonConfigProvider.bind(binder, "druid.database.tables", DbTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.tables", DbTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.connector", DbConnectorConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.config", ConfigManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.segment", DatabaseSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class);
binder.bind(DatabaseSegmentManager.class)
.toProvider(DatabaseSegmentManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(DatabaseRuleManager.class)
.toProvider(DatabaseRuleManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(ConfigManager.class)
.toProvider(ConfigManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(new TypeLiteral<DiscoverySelector<Server>>(){})
.annotatedWith(IndexingService.class)
.to(IndexingServiceSelector.class)
.in(ManageLifecycle.class);
binder.bind(IndexingServiceClient.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(MasterRedirectInfo.class).in(LazySingleton.class);
}
@Provides @ManageLifecycle @IndexingService
public DiscoverySelector<Server> getIndexingServiceSelector(DruidMasterConfig config, ServiceDiscovery serviceDiscovery)
@Provides @LazySingleton @IndexingService
public ServiceProvider getServiceProvider(DruidMasterConfig config, ServiceDiscovery<Void> serviceDiscovery)
{
final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder()
.serviceName(config.getMergerServiceName())
.build();
return new IndexingServiceSelector(serviceProvider);
return serviceDiscovery.serviceProviderBuilder().serviceName(config.getMergerServiceName()).build();
}
@Provides @LazySingleton
public IDBI getDbi(final DbConnector dbConnector, final DbConnectorConfig config, Lifecycle lifecycle)
public IDBI getDbi(final DbConnector dbConnector)
{
if (config.isCreateTables()) {
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
dbConnector.createSegmentTable();
dbConnector.createRulesTable();
}
@Override
public void stop()
{
}
}
);
}
return dbConnector.getDBI();
}
@Provides @LazySingleton
public LoadQueueTaskMaster getLoadQueueTaskMaster(CuratorFramework curator, ObjectMapper jsonMapper)
{
return new LoadQueueTaskMaster(curator, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d"));
}
@Provides @LazySingleton
public ScheduledExecutorFactory getScheduledExecutorFactory(Lifecycle lifecycle)
{
return ScheduledExecutors.createFactory(lifecycle);
}
}

View File

@ -38,19 +38,15 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.Monitor;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
@ -128,7 +124,7 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
monitors.add(new ServerMonitor(getDruidServerMetadata(), serverManager));
startMonitoring(monitors);
final Context root = new Context(getServer(), "/", Context.SESSIONS);
final ServletContextHandler root = new ServletContextHandler(getServer(), "/", ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(

View File

@ -19,67 +19,33 @@
package com.metamx.druid.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.curator.discovery.DiscoveryModule;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.guice.DruidGuiceExtensions;
import com.metamx.druid.guice.DruidSecondaryModule;
import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.MasterModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.initialization.ConfigFactoryModule;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.PropertiesModule;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.jackson.JacksonModule;
import com.metamx.druid.initialization.JettyServerInitializer;
import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterConfig;
import com.metamx.druid.master.LoadQueueTaskMaster;
import com.metamx.druid.metrics.MetricsModule;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import org.apache.curator.framework.CuratorFramework;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.FilterHolder;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.servlet.GzipFilter;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.IDBI;
import javax.annotation.Nullable;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
@ -91,185 +57,55 @@ public class MasterMain
{
LogLevelAdjuster.register();
Injector injector = makeInjector(
new LifecycleModule(Key.get(MonitorScheduler.class)),
Injector injector = Initialization.makeInjector(
new LifecycleModule(Key.get(MonitorScheduler.class), Key.get(DruidMaster.class)),
EmitterModule.class,
HttpClientModule.class,
CuratorModule.class,
MetricsModule.class,
new MetricsModule(),
DiscoveryModule.class,
ServerModule.class,
new JettyServerModule(new MasterJettyServerInitializer()),
MasterModule.class
);
final ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class);
final ConfigurationObjectFactory configFactory = injector.getInstance(ConfigurationObjectFactory.class);
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
final ServiceEmitter emitter = injector.getInstance(ServiceEmitter.class);
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
CuratorFramework curatorFramework = injector.getInstance(CuratorFramework.class);
final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class);
ServerInventoryView serverInventoryView = injector.getInstance(ServerInventoryView.class);
final DatabaseSegmentManager databaseSegmentManager = injector.getInstance(DatabaseSegmentManager.class);
final DatabaseRuleManager databaseRuleManager = injector.getInstance(DatabaseRuleManager.class);
final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class);
final DruidNodeConfig nodeConfig = configFactory.build(DruidNodeConfig.class);
final DruidNodeConfig nodeConfig = injector.getInstance(DruidNodeConfig.class);
final ServiceAnnouncer serviceAnnouncer = injector.getInstance(ServiceAnnouncer.class);
Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle);
IDBI dbi = injector.getInstance(IDBI.class); // TODO: make tables and stuff
final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class);
DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable());
JacksonConfigManager configManager = new JacksonConfigManager(
new ConfigManager(dbi, configManagerConfig), jsonMapper
);
final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster(
curatorFramework, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d")
);
final DruidMaster master = new DruidMaster(
druidMasterConfig,
zkPaths,
configManager,
databaseSegmentManager,
serverInventoryView,
databaseRuleManager,
curatorFramework,
emitter,
scheduledExecutorFactory,
injector.getInstance(IndexingServiceClient.class),
taskMaster
);
lifecycle.addManagedInstance(master);
try {
lifecycle.start();
Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle);
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
}
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
lifecycle.join();
}
private static class MasterJettyServerInitializer implements JettyServerInitializer
{
@Override
public void run()
public void initialize(Server server, Injector injector)
{
log.info("Running shutdown hook");
lifecycle.stop();
}
}
)
);
final Server server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
final RedirectInfo redirectInfo = new RedirectInfo()
{
@Override
public boolean doLocal()
{
return master.isClusterMaster();
}
@Override
public URL getRedirectURL(String queryString, String requestURI)
{
try {
final String currentMaster = master.getCurrentMaster();
if (currentMaster == null) {
return null;
}
String location = String.format("http://%s%s", currentMaster, requestURI);
if (queryString != null) {
location = String.format("%s?%s", location, queryString);
}
return new URL(location);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
};
final Context staticContext = new Context(server, "/static", Context.SESSIONS);
staticContext.addServlet(new ServletHolder(new RedirectServlet(redirectInfo)), "/*");
final ServletContextHandler staticContext = new ServletContextHandler(server, "/static", ServletContextHandler.SESSIONS);
staticContext.addServlet(new ServletHolder(injector.getInstance(RedirectServlet.class)), "/*");
staticContext.setResourceBase(ComputeMain.class.getClassLoader().getResource("static").toExternalForm());
final Context root = new Context(server, "/", Context.SESSIONS);
final ServletContextHandler root = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GzipFilter.class, "/*", 0);
root.addFilter(
new FilterHolder(
new RedirectFilter(
redirectInfo
)
), "/*", 0
);
root.addFilter(GuiceFilter.class, "/info/*", 0);
root.addFilter(GuiceFilter.class, "/master/*", 0);
server.start();
server.join();
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GuiceFilter.class, "/info/*", null);
root.addFilter(GuiceFilter.class, "/master/*", null);
}
private static Injector makeInjector(final Object... modules)
{
final Injector baseInjector = Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new ConfigFactoryModule()
);
List<Object> actualModules = Lists.newArrayList();
actualModules.add(DruidSecondaryModule.class);
actualModules.addAll(Arrays.asList(modules));
return Guice.createInjector(
Lists.transform(
actualModules,
new Function<Object, Module>()
{
@Override
@SuppressWarnings("unchecked")
public Module apply(@Nullable Object input)
{
if (input instanceof Module) {
baseInjector.injectMembers(input);
return (Module) input;
}
if (input instanceof Class) {
if (Module.class.isAssignableFrom((Class) input)) {
return baseInjector.getInstance((Class<? extends Module>) input);
}
else {
throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class);
}
}
throw new ISE("Unknown module type[%s]", input.getClass());
}
}
)
);
}
}

View File

@ -0,0 +1,47 @@
package com.metamx.druid.http;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.druid.master.DruidMaster;
import java.net.URL;
/**
*/
public class MasterRedirectInfo implements RedirectInfo
{
private final DruidMaster master;
@Inject
public MasterRedirectInfo(DruidMaster master) {
this.master = master;
}
@Override
public boolean doLocal()
{
return master.isClusterMaster();
}
@Override
public URL getRedirectURL(String queryString, String requestURI)
{
try {
final String currentMaster = master.getCurrentMaster();
if (currentMaster == null) {
return null;
}
String location = String.format("http://%s%s", currentMaster, requestURI);
if (queryString != null) {
location = String.format("%s?%s", location, queryString);
}
return new URL(location);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.http;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import javax.servlet.Filter;
@ -40,6 +41,7 @@ public class RedirectFilter implements Filter
private final RedirectInfo redirectInfo;
@Inject
public RedirectFilter(
RedirectInfo redirectInfo
)

View File

@ -19,6 +19,7 @@
package com.metamx.druid.http;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import org.mortbay.jetty.servlet.DefaultServlet;
@ -38,6 +39,7 @@ public class RedirectServlet extends DefaultServlet
private final RedirectInfo redirectInfo;
@Inject
public RedirectServlet(
RedirectInfo redirectInfo
)

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
@ -44,6 +45,7 @@ import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
@ -68,6 +70,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ManageLifecycle
public class DruidMaster
{
public static final String MASTER_OWNER_NODE = "_MASTER";
@ -94,6 +97,7 @@ public class DruidMaster
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final AtomicReference<LeaderLatch> leaderLatch;
@Inject
public DruidMaster(
DruidMasterConfig config,
ZkPathsConfig zkPaths,

View File

@ -20,6 +20,7 @@
package com.metamx.druid.master;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
@ -33,6 +34,7 @@ public class LoadQueueTaskMaster
private final ObjectMapper jsonMapper;
private final ExecutorService peonExec;
@Inject
public LoadQueueTaskMaster(
CuratorFramework curator,
ObjectMapper jsonMapper,

View File

@ -0,0 +1,26 @@
package com.metamx.druid.metrics;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.metrics.MonitorSchedulerConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
/**
*/
public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig
{
@JsonProperty
private Period emissionPeriod = new Period("PT1M");
@JsonProperty
public Period getEmissionPeriod()
{
return emissionPeriod;
}
@Override
public Duration getEmitterPeriod()
{
return emissionPeriod.toStandardDuration();
}
}

View File

@ -1,5 +1,6 @@
package com.metamx.druid.metrics;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
@ -8,13 +9,13 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.logger.Logger;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import java.util.List;
@ -44,8 +45,10 @@ public class MetricsModule implements Module
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.monitoring", DruidMonitorSchedulerConfig.class);
binder.bind(JvmMonitor.class).in(LazySingleton.class);
binder.bind(SysMonitor.class).in(LazySingleton.class);
binder.bind(SysMonitor.class).in(LazySingleton.class); // TODO: allow for disabling of this monitor
for (Class<? extends Monitor> monitor : monitors) {
binder.bind(monitor).in(LazySingleton.class);
@ -53,7 +56,11 @@ public class MetricsModule implements Module
}
@Provides @ManageLifecycle
public MonitorScheduler getMonitorScheduler(MonitorSchedulerConfig config, ServiceEmitter emitter, Injector injector)
public MonitorScheduler getMonitorScheduler(
Supplier<DruidMonitorSchedulerConfig> config,
ServiceEmitter emitter,
Injector injector
)
{
List<Monitor> monitors = Lists.newArrayList();
@ -68,7 +75,7 @@ public class MetricsModule implements Module
}
return new MonitorScheduler(
config,
config.get(),
Execs.scheduledSingleThreaded("MonitorScheduler-%s"),
emitter,
monitors