Make possible to start Peon without DI loading of any querying-related stuff (#4516)

* Make QueryRunnerFactoryConglomerate injection lazy in TaskToolbox/TaskToolboxFactory

* Extract QueryablePeonModule and add druid.modules.excludeList config

* Typo
This commit is contained in:
Roman Leventov 2017-07-12 21:18:25 +03:00 committed by Himanshu
parent 53e6b5cb9b
commit b2865b7c7b
17 changed files with 150 additions and 42 deletions

View File

@ -24,11 +24,16 @@ Many of Druid's external dependencies can be plugged in as modules. Extensions c
|`druid.extensions.directory`|The root extension directory where user can put extensions related files. Druid will load extensions stored under this directory.|`extensions` (This is a relative path to Druid's working directory)|
|`druid.extensions.hadoopDependenciesDir`|The root hadoop dependencies directory where user can put hadoop related dependencies files. Druid will load the dependencies based on the hadoop coordinate specified in the hadoop index task.|`hadoop-dependencies` (This is a relative path to Druid's working directory|
|`druid.extensions.loadList`|A JSON array of extensions to load from extension directories by Druid. If it is not specified, its value will be `null` and Druid will load all the extensions under `druid.extensions.directory`. If its value is empty list `[]`, then no extensions will be loaded at all. It is also allowed to specify absolute path of other custom extensions not stored in the common extensions directory.|null|
|`druid.extensions.moduleExcludeList`|A JSON array of canonical class names (e. g. `"io.druid.somepackage.SomeModule"`) of module classes which shouldn't be loaded, even if they are found in extensions specified by `druid.extensions.loadList`. Useful when some useful extension contains some module, which shouldn't be loaded on some Druid node type because some dependencies of that module couldn't be satisfied.|[]|
|`druid.extensions.searchCurrentClassloader`|This is a boolean flag that determines if Druid will search the main classloader for extensions. It defaults to true but can be turned off if you have reason to not automatically add all modules on the classpath.|true|
|`druid.extensions.hadoopContainerDruidClasspath`|Hadoop Indexing launches hadoop jobs and this configuration provides way to explicitly set the user classpath for the hadoop job. By default this is computed automatically by druid based on the druid process classpath and set of extensions. However, sometimes you might want to be explicit to resolve dependency conflicts between druid and hadoop.|null|
|`druid.extensions.addExtensionsToHadoopContainer`|Only applicable if `druid.extensions.hadoopContainerDruidClasspath` is provided. If set to true, then extensions specified in the loadList are added to hadoop container classpath. Note that when `druid.extensions.hadoopContainerDruidClasspath` is not provided then extensions are always added to hadoop container classpath.|false|
### Modules
|Property|Description|Default|
|--------|-----------|-------|
|`druid.modules.excludeList`|A JSON array of canonical class names (e. g. `"io.druid.somepackage.SomeModule"`) of module classes which shouldn't be loaded, even if they are found in extensions specified by `druid.extensions.loadList`, or in the list of core modules specified to be loaded on a particular Druid node type. Useful when some useful extension contains some module, which shouldn't be loaded on some Druid node type because some dependencies of that module couldn't be satisfied.|[]|
### Zookeeper
We recommend just setting the base ZK path and the ZK service host, but all ZK paths that Druid uses can be overwritten to absolute paths.

View File

@ -1534,7 +1534,7 @@ public class KafkaIndexTaskTest
new TestDataSegmentAnnouncer(),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
makeTimeseriesOnlyConglomerate(),
this::makeTimeseriesOnlyConglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.inject.Provider;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
@ -33,7 +34,6 @@ import io.druid.client.cache.CacheConfig;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
@ -62,7 +62,6 @@ import java.util.concurrent.ExecutorService;
public class TaskToolbox
{
private final TaskConfig config;
private final Task task;
private final TaskActionClient taskActionClient;
private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher;
@ -72,7 +71,12 @@ public class TaskToolbox
private final DataSegmentAnnouncer segmentAnnouncer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
/**
* Using Provider, not {@link QueryRunnerFactoryConglomerate} directly, to not require {@link
* io.druid.indexing.overlord.TaskRunner} implementations that create TaskToolboxes to inject query stuff eagerly,
* because it may be unavailable, e. g. for batch tasks running in Spark or Hadoop.
*/
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final MonitorScheduler monitorScheduler;
private final ExecutorService queryExecutorService;
private final SegmentLoader segmentLoader;
@ -85,7 +89,6 @@ public class TaskToolbox
public TaskToolbox(
TaskConfig config,
Task task,
TaskActionClient taskActionClient,
ServiceEmitter emitter,
DataSegmentPusher segmentPusher,
@ -95,7 +98,7 @@ public class TaskToolbox
DataSegmentAnnouncer segmentAnnouncer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
@ -108,7 +111,6 @@ public class TaskToolbox
)
{
this.config = config;
this.task = task;
this.taskActionClient = taskActionClient;
this.emitter = emitter;
this.segmentPusher = segmentPusher;
@ -118,7 +120,7 @@ public class TaskToolbox
this.segmentAnnouncer = segmentAnnouncer;
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
this.segmentLoader = segmentLoader;
@ -182,7 +184,7 @@ public class TaskToolbox
public QueryRunnerFactoryConglomerate getQueryRunnerFactoryConglomerate()
{
return queryRunnerFactoryConglomerate;
return queryRunnerFactoryConglomerateProvider.get();
}
public ExecutorService getQueryExecutorService()

View File

@ -22,6 +22,7 @@ package io.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
@ -59,7 +60,7 @@ public class TaskToolboxFactory
private final DataSegmentAnnouncer segmentAnnouncer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final ExecutorService queryExecutorService;
private final MonitorScheduler monitorScheduler;
private final SegmentLoaderFactory segmentLoaderFactory;
@ -81,7 +82,7 @@ public class TaskToolboxFactory
DataSegmentAnnouncer segmentAnnouncer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
@Processing ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
SegmentLoaderFactory segmentLoaderFactory,
@ -102,7 +103,7 @@ public class TaskToolboxFactory
this.segmentAnnouncer = segmentAnnouncer;
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
this.segmentLoaderFactory = segmentLoaderFactory;
@ -118,7 +119,6 @@ public class TaskToolboxFactory
final File taskWorkDir = config.getTaskWorkDir(task.getId());
return new TaskToolbox(
config,
task,
taskActionClientFactory.create(task),
emitter,
segmentPusher,
@ -128,7 +128,7 @@ public class TaskToolboxFactory
segmentAnnouncer,
serverAnnouncer,
handoffNotifierFactory,
queryRunnerFactoryConglomerate,
queryRunnerFactoryConglomerateProvider,
queryExecutorService,
monitorScheduler,
segmentLoaderFactory.manufacturate(taskWorkDir),

View File

@ -103,7 +103,7 @@ public class TaskToolboxTest
mockSegmentAnnouncer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
mockHandoffNotifierFactory,
mockQueryRunnerFactoryConglomerate,
() -> mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
mockMonitorScheduler,
new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager),

View File

@ -896,7 +896,7 @@ public class IndexTaskTest
indexTask.run(
new TaskToolbox(
null, null, new TaskActionClient()
null, new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException

View File

@ -1025,7 +1025,7 @@ public class RealtimeIndexTaskTest
new TestDataSegmentAnnouncer(),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
conglomerate,
() -> conglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(

View File

@ -140,7 +140,7 @@ public class SameIntervalMergeTaskTest
mergeTask.run(
new TaskToolbox(
null, null, new TaskActionClient()
null, new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException

View File

@ -577,7 +577,7 @@ public class TaskLifecycleTest
}, // segment announcer
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
() -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
MoreExecutors.sameThreadExecutor(), // query executor service
monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(

View File

@ -22,7 +22,6 @@ package io.druid.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.List;
/**
@ -49,13 +48,6 @@ public class ExtensionsConfig
@JsonProperty
private List<String> loadList;
/**
* Canonical class names of modules, which should not be loaded despite they are founded in extensions from {@link
* #loadList}.
*/
@JsonProperty
private List<String> moduleExcludeList = Collections.emptyList();
public boolean searchCurrentClassloader()
{
return searchCurrentClassloader;
@ -86,11 +78,6 @@ public class ExtensionsConfig
return loadList;
}
public List<String> getModuleExcludeList()
{
return moduleExcludeList;
}
@Override
public String toString()
{
@ -101,7 +88,6 @@ public class ExtensionsConfig
", hadoopContainerDruidClasspath='" + hadoopContainerDruidClasspath + '\'' +
", addExtensionsToHadoopContainer=" + addExtensionsToHadoopContainer +
", loadList=" + loadList +
", moduleExcludeList=" + moduleExcludeList +
'}';
}
}

View File

@ -49,6 +49,7 @@ public class GuiceInjectors
{
binder.bind(DruidSecondaryModule.class);
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
JsonConfigProvider.bind(binder, "druid.modules", ModulesConfig.class);
}
}
);

View File

@ -0,0 +1,49 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.List;
public class ModulesConfig
{
/**
* Canonical class names of modules, which should not be loaded despite they are founded in extensions from {@link
* io.druid.guice.ExtensionsConfig#loadList} or the standard list of modules loaded by some node type, e. g. {@code
* CliPeon}.
*/
@JsonProperty
private List<String> excludeList = Collections.emptyList();
public List<String> getExcludeList()
{
return excludeList;
}
@Override
public String toString()
{
return "ModulesConfig{" +
"excludeList=" + excludeList +
'}';
}
}

View File

@ -46,6 +46,7 @@ import io.druid.guice.JavaScriptModule;
import io.druid.guice.LifecycleModule;
import io.druid.guice.LocalDataStorageDruidModule;
import io.druid.guice.MetadataConfigModule;
import io.druid.guice.ModulesConfig;
import io.druid.guice.ParsersModule;
import io.druid.guice.ServerModule;
import io.druid.guice.ServerViewModule;
@ -185,8 +186,6 @@ public class Initialization
+ "is it a local or anonymous class?",
serviceImpl.getClass().getName()
);
} else if (extensionsConfig.getModuleExcludeList().contains(serviceImplName)) {
log.info("Not loading module [%s] because it is present in moduleExcludeList", serviceImplName);
} else if (!implClassNamesToLoad.contains(serviceImplName)) {
log.info(
"Adding implementation [%s] for class [%s] from %s extension",
@ -390,6 +389,7 @@ public class Initialization
private static class ModuleList
{
private final Injector baseInjector;
private final ModulesConfig modulesConfig;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final List<Module> modules;
@ -397,6 +397,7 @@ public class Initialization
public ModuleList(Injector baseInjector)
{
this.baseInjector = baseInjector;
this.modulesConfig = baseInjector.getInstance(ModulesConfig.class);
this.jsonMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Json.class));
this.smileMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Smile.class));
this.modules = Lists.newArrayList();
@ -410,12 +411,21 @@ public class Initialization
public void addModule(Object input)
{
if (input instanceof DruidModule) {
if (!checkModuleClass(input.getClass())) {
return;
}
baseInjector.injectMembers(input);
modules.add(registerJacksonModules(((DruidModule) input)));
} else if (input instanceof Module) {
if (!checkModuleClass(input.getClass())) {
return;
}
baseInjector.injectMembers(input);
modules.add((Module) input);
} else if (input instanceof Class) {
if (!checkModuleClass((Class<?>) input)) {
return;
}
if (DruidModule.class.isAssignableFrom((Class) input)) {
modules.add(registerJacksonModules(baseInjector.getInstance((Class<? extends DruidModule>) input)));
} else if (Module.class.isAssignableFrom((Class) input)) {
@ -429,6 +439,16 @@ public class Initialization
}
}
private boolean checkModuleClass(Class<?> moduleClass)
{
String moduleClassName = moduleClass.getCanonicalName();
if (moduleClassName != null && modulesConfig.getExcludeList().contains(moduleClassName)) {
log.info("Not loading module [%s] because it is present in excludeList", moduleClassName);
return false;
}
return true;
}
public void addModules(Object... object)
{
for (Object o : object) {

View File

@ -32,6 +32,7 @@ import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DirectDruidClient;
import io.druid.guice.LazySingleton;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.ISE;
@ -85,6 +86,7 @@ import java.util.concurrent.atomic.AtomicLong;
/**
*/
@LazySingleton
@Path("/druid/v2/")
public class QueryResource implements QueryCountStatsProvider
{

View File

@ -93,7 +93,7 @@ public class CliHistorical extends ServerRunnable
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.HISTORICAL));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class);
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class);
Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, HistoricalResource.class);
Jerseys.addResource(binder, SegmentListerResource.class);

View File

@ -49,6 +49,7 @@ import io.druid.guice.NodeTypeConfig;
import io.druid.guice.PolyBind;
import io.druid.guice.QueryRunnerFactoryModule;
import io.druid.guice.QueryableModule;
import io.druid.guice.QueryablePeonModule;
import io.druid.guice.annotations.Json;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
@ -86,13 +87,11 @@ import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.QueryResource;
import io.druid.server.coordination.ServerType;
import io.druid.server.http.SegmentListerResource;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.metrics.DataSourceTaskIdHolder;
import io.druid.server.metrics.QueryCountStatsProvider;
import org.eclipse.jetty.server.Server;
import java.io.File;
@ -210,10 +209,7 @@ public class CliPeon extends GuiceRunnable
binder.bind(CoordinatorClient.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, SegmentListerResource.class);
LifecycleModule.register(binder, QueryResource.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.fromString(nodeType)));
LifecycleModule.register(binder, Server.class);
}
@ -271,6 +267,7 @@ public class CliPeon extends GuiceRunnable
return task.getId();
}
},
new QueryablePeonModule(),
new IndexingServiceFirehoseModule(),
new ChatHandlerServerModule(properties),
new LookupModule()

View File

@ -0,0 +1,46 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import io.druid.server.QueryResource;
import io.druid.server.metrics.QueryCountStatsProvider;
import java.util.Collections;
import java.util.List;
public class QueryablePeonModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Collections.emptyList();
}
@Override
public void configure(Binder binder)
{
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class);
Jerseys.addResource(binder, QueryResource.class);
LifecycleModule.register(binder, QueryResource.class);
}
}