1) Adjustments to allow for the addition of new Runnables via modules. Modules should implement CliCommandCreator and add their stuff there.

This commit is contained in:
cheddar 2013-09-11 17:41:20 -05:00
parent 6c9a107356
commit 3e4a4f5566
12 changed files with 233 additions and 129 deletions

View File

@ -68,6 +68,7 @@ public class DruidSecondaryModule implements Module
@Override
public void configure(Binder binder)
{
binder.requireExplicitBindings();
binder.install(new DruidGuiceExtensions());
binder.bind(Properties.class).toInstance(properties);
binder.bind(ConfigurationObjectFactory.class).toInstance(factory);

View File

@ -22,117 +22,126 @@ package io.druid.server.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.initialization.DruidModule;
import io.druid.jackson.JacksonModule;
import io.tesla.aether.TeslaAether;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.eclipse.aether.artifact.Artifact;
import org.eclipse.aether.artifact.DefaultArtifact;
import org.eclipse.aether.collection.CollectRequest;
import org.eclipse.aether.graph.Dependency;
import org.eclipse.aether.graph.DependencyFilter;
import org.eclipse.aether.graph.DependencyNode;
import org.eclipse.aether.resolution.DependencyRequest;
import org.eclipse.aether.util.artifact.JavaScopes;
import org.eclipse.aether.util.filter.DependencyFilterUtils;
import java.io.PrintStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
/**
*/
public class Initialization
{
private static final Logger log = new Logger(Initialization.class);
private static final Map<String, ClassLoader> loadersMap = Maps.newHashMap();
private static final List<String> exclusions = Arrays.asList(
private static final Set<String> exclusions = Sets.newHashSet(
"io.druid",
"com.metamx.druid"
);
public static Injector makeInjector(final Object... modules)
{
final Injector baseInjector = Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new ConfigModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(DruidSecondaryModule.class);
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
for (Object module : modules) {
if (module instanceof Class) {
binder.bind((Class) module);
}
}
}
}
);
ModuleList actualModules = new ModuleList(baseInjector);
actualModules.addModule(DruidSecondaryModule.class);
for (Object module : modules) {
actualModules.addModule(module);
}
addExtensionModules(baseInjector.getInstance(ExtensionsConfig.class), actualModules);
return Guice.createInjector(actualModules.getModules());
}
private static void addExtensionModules(ExtensionsConfig config, ModuleList actualModules)
public synchronized static <T> List<T> getFromExtensions(ExtensionsConfig config, Class<T> clazz)
{
final TeslaAether aether = getAetherClient(config);
List<T> retVal = Lists.newArrayList();
for (String coordinate : config.getCoordinates()) {
log.info("Loading extension[%s]", coordinate);
try {
final List<Artifact> artifacts = aether.resolveArtifacts(coordinate);
List<URL> urls = Lists.newArrayListWithExpectedSize(artifacts.size());
for (Artifact artifact : artifacts) {
if (!exclusions.contains(artifact.getGroupId())) {
urls.add(artifact.getFile().toURI().toURL());
ClassLoader loader = loadersMap.get(coordinate);
if (loader == null) {
final CollectRequest collectRequest = new CollectRequest();
collectRequest.setRoot(new Dependency(new DefaultArtifact(coordinate), JavaScopes.RUNTIME));
DependencyRequest dependencyRequest = new DependencyRequest(
collectRequest,
DependencyFilterUtils.andFilter(
DependencyFilterUtils.classpathFilter(JavaScopes.RUNTIME),
new DependencyFilter()
{
@Override
public boolean accept(DependencyNode node, List<DependencyNode> parents)
{
if (accept(node.getArtifact())) {
return false;
}
for (DependencyNode parent : parents) {
if (accept(parent.getArtifact())) {
return false;
}
}
return true;
}
private boolean accept(final Artifact artifact)
{
return exclusions.contains(artifact.getGroupId());
}
}
)
);
final List<Artifact> artifacts = aether.resolveArtifacts(dependencyRequest);
List<URL> urls = Lists.newArrayListWithExpectedSize(artifacts.size());
for (Artifact artifact : artifacts) {
if (!exclusions.contains(artifact.getGroupId())) {
urls.add(artifact.getFile().toURI().toURL());
}
else {
log.error("Skipped Artifact[%s]", artifact);
}
}
else {
log.debug("Skipped Artifact[%s]", artifact);
for (URL url : urls) {
log.error("Added URL[%s]", url);
}
loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader());
loadersMap.put(coordinate, loader);
}
for (URL url : urls) {
log.debug("Added URL[%s]", url);
}
final ServiceLoader<T> serviceLoader = ServiceLoader.load(clazz, loader);
ClassLoader loader = new URLClassLoader(
urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader()
);
final ServiceLoader<DruidModule> serviceLoader = ServiceLoader.load(DruidModule.class, loader);
for (DruidModule module : serviceLoader) {
for (T module : serviceLoader) {
log.info("Adding extension module[%s]", module.getClass());
actualModules.addModule(module);
retVal.add(module);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
return retVal;
}
private static DefaultTeslaAether getAetherClient(ExtensionsConfig config)
@ -162,6 +171,22 @@ public class Initialization
}
}
public static Injector makeInjectorWithModules(final Injector baseInjector, List<Object> modules)
{
ModuleList actualModules = new ModuleList(baseInjector);
actualModules.addModule(DruidSecondaryModule.class);
for (Object module : modules) {
actualModules.addModule(module);
}
final ExtensionsConfig config = baseInjector.getInstance(ExtensionsConfig.class);
for (DruidModule module : Initialization.getFromExtensions(config, DruidModule.class)) {
actualModules.addModule(module);
}
return Guice.createInjector(actualModules.getModules());
}
private static class ModuleList
{
private final Injector baseInjector;
@ -217,5 +242,4 @@ public class Initialization
return module;
}
}
}
}

View File

@ -19,7 +19,7 @@
package io.druid.cli;
import com.google.inject.Injector;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.cache.CacheMonitor;
@ -35,10 +35,11 @@ import io.druid.guice.annotations.Client;
import io.druid.server.ClientQuerySegmentWalker;
import io.druid.server.StatusResource;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import java.util.List;
/**
*/
@Command(
@ -55,22 +56,22 @@ public class CliBroker extends ServerRunnable
}
@Override
protected Injector getInjector()
protected List<Object> getModules()
{
return Initialization.makeInjector(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule().register(CacheMonitor.class),
new ServerModule(),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new QueryableModule(ClientQuerySegmentWalker.class),
new QueryToolChestModule(),
new ServerViewModule(),
new HttpClientModule("druid.broker.http", Client.class),
new BrokerModule()
return ImmutableList.<Object>of(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule().register(CacheMonitor.class),
new ServerModule(),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new QueryableModule(ClientQuerySegmentWalker.class),
new QueryToolChestModule(),
new ServerViewModule(),
new HttpClientModule("druid.broker.http", Client.class),
new BrokerModule()
);
}
}

View File

@ -19,6 +19,7 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger;
@ -39,7 +40,6 @@ import io.druid.server.http.InfoResource;
import io.druid.server.http.MasterResource;
import io.druid.server.http.RedirectFilter;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.master.DruidMaster;
@ -55,6 +55,8 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import java.util.List;
/**
*/
@Command(
@ -71,9 +73,9 @@ public class CliCoordinator extends ServerRunnable
}
@Override
protected Injector getInjector()
protected List<Object> getModules()
{
return Initialization.makeInjector(
return ImmutableList.<Object>of(
new LifecycleModule().register(DruidMaster.class),
EmitterModule.class,
HttpClientModule.global(),

View File

@ -19,7 +19,7 @@
package io.druid.cli;
import com.google.inject.Injector;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.curator.CuratorModule;
@ -38,11 +38,12 @@ import io.druid.server.StatusResource;
import io.druid.server.coordination.ServerManager;
import io.druid.server.coordination.ZkCoordinator;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import io.druid.server.metrics.ServerMonitor;
import java.util.List;
/**
*/
@Command(
@ -59,9 +60,9 @@ public class CliHistorical extends ServerRunnable
}
@Override
protected Injector getInjector()
protected List<Object> getModules()
{
return Initialization.makeInjector(
return ImmutableList.<Object>of(
new LifecycleModule().register(ZkCoordinator.class),
EmitterModule.class,
HttpClientModule.global(),

View File

@ -19,6 +19,7 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger;
@ -34,7 +35,6 @@ import io.druid.indexing.worker.WorkerTaskMonitor;
import io.druid.indexing.worker.http.WorkerResource;
import io.druid.server.StatusResource;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
@ -47,6 +47,8 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import java.util.List;
/**
*/
@Command(
@ -63,9 +65,9 @@ public class CliMiddleManager extends ServerRunnable
}
@Override
protected Injector getInjector()
protected List<Object> getModules()
{
return Initialization.makeInjector(
return ImmutableList.<Object>of(
new LifecycleModule().register(WorkerTaskMonitor.class),
EmitterModule.class,
HttpClientModule.global(),

View File

@ -19,6 +19,7 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger;
@ -38,7 +39,6 @@ import io.druid.indexing.coordinator.http.IndexerCoordinatorResource;
import io.druid.server.StatusResource;
import io.druid.server.http.RedirectFilter;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
@ -54,6 +54,8 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import org.eclipse.jetty.util.resource.ResourceCollection;
import java.util.List;
/**
*/
@Command(
@ -70,9 +72,9 @@ public class CliOverlord extends ServerRunnable
}
@Override
protected Injector getInjector()
protected List<Object> getModules()
{
return Initialization.makeInjector(
return ImmutableList.<Object>of(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),

View File

@ -20,6 +20,8 @@
package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
@ -70,34 +72,45 @@ public class CliPeon implements Runnable
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String nodeType = "indexer-executor";
private Injector injector;
@Inject
public void configure(Injector injector)
{
this.injector = injector;
}
private static final Logger log = new Logger(CliPeon.class);
protected Injector getInjector()
{
return Initialization.makeInjector(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule(),
new ServerModule(),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class)
.addResource(ChatHandlerResource.class),
new DiscoveryModule(),
new ServerViewModule(),
new StorageNodeModule(nodeType),
new DataSegmentPusherModule(),
new AnnouncerModule(),
new DruidProcessingModule(),
new QueryableModule(ThreadPoolTaskRunner.class),
new QueryRunnerFactoryModule(),
new IndexingServiceDiscoveryModule(),
new AWSModule(),
new PeonModule(
new ExecutorLifecycleConfig()
.setTaskFile(new File(taskAndStatusFile.get(0)))
.setStatusFile(new File(taskAndStatusFile.get(1)))
return Initialization.makeInjectorWithModules(
injector,
ImmutableList.of(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule(),
new ServerModule(),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class)
.addResource(ChatHandlerResource.class),
new DiscoveryModule(),
new ServerViewModule(),
new StorageNodeModule(nodeType),
new DataSegmentPusherModule(),
new AnnouncerModule(),
new DruidProcessingModule(),
new QueryableModule(ThreadPoolTaskRunner.class),
new QueryRunnerFactoryModule(),
new IndexingServiceDiscoveryModule(),
new AWSModule(),
new PeonModule(
new ExecutorLifecycleConfig()
.setTaskFile(new File(taskAndStatusFile.get(0)))
.setStatusFile(new File(taskAndStatusFile.get(1)))
)
)
);
}

View File

@ -19,7 +19,7 @@
package io.druid.cli;
import com.google.inject.Injector;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.curator.CuratorModule;
@ -39,10 +39,11 @@ import io.druid.guice.StorageNodeModule;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.server.StatusResource;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import java.util.List;
/**
*/
@Command(
@ -59,9 +60,9 @@ public class CliRealtime extends ServerRunnable
}
@Override
protected Injector getInjector()
protected List<Object> getModules()
{
return Initialization.makeInjector(
return ImmutableList.<Object>of(
new LifecycleModule(),
EmitterModule.class,
DbConnectorModule.class,

View File

@ -19,7 +19,7 @@
package io.druid.cli;
import com.google.inject.Injector;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger;
import druid.examples.guice.RealtimeExampleModule;
import io.airlift.command.Command;
@ -32,9 +32,10 @@ import io.druid.guice.StorageNodeModule;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.server.StatusResource;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.JettyServerModule;
import java.util.List;
/**
*/
@Command(
@ -51,9 +52,9 @@ public class CliRealtimeExample extends ServerRunnable
}
@Override
protected Injector getInjector()
protected List<Object> getModules()
{
return Initialization.makeInjector(
return ImmutableList.of(
new LifecycleModule(),
EmitterModule.class,
DruidProcessingModule.class,

View File

@ -1,6 +1,6 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -19,9 +19,23 @@
package io.druid.cli;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import io.airlift.command.Cli;
import io.airlift.command.Help;
import io.airlift.command.ParseException;
import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.JsonConfigProvider;
import io.druid.jackson.JacksonModule;
import io.druid.server.initialization.ConfigModule;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.PropertiesModule;
import java.util.List;
/**
*/
@ -54,9 +68,19 @@ public class Main
.withDefaultCommand(Help.class)
.withCommands(CliPeon.class);
final Injector injector = makeStartupInjector();
final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class);
final List<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(config, CliCommandCreator.class);
for (CliCommandCreator creator : extensionCommands) {
creator.addCommands(builder);
}
final Cli<Runnable> cli = builder.build();
try {
cli.parse(args).run();
final Runnable command = cli.parse(args);
injector.injectMembers(command);
command.run();
}
catch (ParseException e) {
System.out.println("ERROR!!!!");
@ -65,4 +89,23 @@ public class Main
cli.parse(new String[]{"help"}).run();
}
}
public static Injector makeStartupInjector()
{
return Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new ConfigModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(DruidSecondaryModule.class);
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
}
}
);
}
}

View File

@ -20,10 +20,14 @@
package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.initialization.LogLevelAdjuster;
import io.druid.server.initialization.Initialization;
import java.util.List;
/**
*/
@ -31,12 +35,20 @@ public abstract class ServerRunnable implements Runnable
{
private final Logger log;
private Injector baseInjector;
public ServerRunnable(Logger log)
{
this.log = log;
}
protected abstract Injector getInjector();
@Inject
public void configure(Injector injector)
{
this.baseInjector = injector;
}
protected abstract List<Object> getModules();
@Override
public void run()
@ -44,7 +56,8 @@ public abstract class ServerRunnable implements Runnable
try {
LogLevelAdjuster.register();
final Lifecycle lifecycle = getInjector().getInstance(Lifecycle.class);
final Injector injector = Initialization.makeInjectorWithModules(baseInjector, getModules());
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
try {
lifecycle.start();