mirror of https://github.com/apache/druid.git
Allow injection of node-role set to all non base modules (#13371)
This commit is contained in:
parent
7f4e386509
commit
6ccf31490e
|
@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.Module;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Injector;
|
|
||||||
import com.google.inject.Key;
|
import com.google.inject.Key;
|
||||||
import com.google.inject.TypeLiteral;
|
|
||||||
import com.google.inject.multibindings.Multibinder;
|
import com.google.inject.multibindings.Multibinder;
|
||||||
import org.apache.druid.discovery.NodeRole;
|
import org.apache.druid.discovery.NodeRole;
|
||||||
import org.apache.druid.guice.JsonConfigProvider;
|
import org.apache.druid.guice.JsonConfigProvider;
|
||||||
|
@ -38,7 +36,6 @@ import org.apache.druid.msq.indexing.DurableStorageCleanerConfig;
|
||||||
import org.apache.druid.storage.StorageConnector;
|
import org.apache.druid.storage.StorageConnector;
|
||||||
import org.apache.druid.storage.StorageConnectorProvider;
|
import org.apache.druid.storage.StorageConnectorProvider;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -55,7 +52,19 @@ public class MSQDurableStorageModule implements DruidModule
|
||||||
String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "enable");
|
String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "enable");
|
||||||
|
|
||||||
private Properties properties;
|
private Properties properties;
|
||||||
private Injector injector;
|
private Set<NodeRole> nodeRoles;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public void setProperties(Properties properties)
|
||||||
|
{
|
||||||
|
this.properties = properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public void setNodeRoles(@Self Set<NodeRole> nodeRoles)
|
||||||
|
{
|
||||||
|
this.nodeRoles = nodeRoles;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<? extends Module> getJacksonModules()
|
public List<? extends Module> getJacksonModules()
|
||||||
|
@ -78,8 +87,7 @@ public class MSQDurableStorageModule implements DruidModule
|
||||||
.toProvider(Key.get(StorageConnectorProvider.class, MultiStageQuery.class))
|
.toProvider(Key.get(StorageConnectorProvider.class, MultiStageQuery.class))
|
||||||
.in(LazySingleton.class);
|
.in(LazySingleton.class);
|
||||||
|
|
||||||
Set<NodeRole> nodeRoles = getNodeRoles(injector);
|
if (nodeRoles.contains(NodeRole.OVERLORD)) {
|
||||||
if (nodeRoles != null && nodeRoles.contains(NodeRole.OVERLORD)) {
|
|
||||||
JsonConfigProvider.bind(
|
JsonConfigProvider.bind(
|
||||||
binder,
|
binder,
|
||||||
String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "cleaner"),
|
String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "cleaner"),
|
||||||
|
@ -93,37 +101,6 @@ public class MSQDurableStorageModule implements DruidModule
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Inject
|
|
||||||
public void setProperties(Properties properties)
|
|
||||||
{
|
|
||||||
this.properties = properties;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public void setInjector(Injector injector)
|
|
||||||
{
|
|
||||||
this.injector = injector;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Nullable
|
|
||||||
private static Set<NodeRole> getNodeRoles(Injector injector)
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
return injector.getInstance(
|
|
||||||
Key.get(
|
|
||||||
new TypeLiteral<Set<NodeRole>>()
|
|
||||||
{
|
|
||||||
},
|
|
||||||
Self.class
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isDurableShuffleStorageEnabled()
|
private boolean isDurableShuffleStorageEnabled()
|
||||||
{
|
{
|
||||||
return Boolean.parseBoolean((String) properties.getOrDefault(MSQ_INTERMEDIATE_STORAGE_ENABLED, "false"));
|
return Boolean.parseBoolean((String) properties.getOrDefault(MSQ_INTERMEDIATE_STORAGE_ENABLED, "false"));
|
||||||
|
|
|
@ -23,10 +23,7 @@ import com.fasterxml.jackson.databind.Module;
|
||||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
import com.google.inject.Injector;
|
|
||||||
import com.google.inject.Key;
|
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
import com.google.inject.TypeLiteral;
|
|
||||||
import org.apache.druid.discovery.NodeRole;
|
import org.apache.druid.discovery.NodeRole;
|
||||||
import org.apache.druid.frame.processor.Bouncer;
|
import org.apache.druid.frame.processor.Bouncer;
|
||||||
import org.apache.druid.guice.LazySingleton;
|
import org.apache.druid.guice.LazySingleton;
|
||||||
|
@ -86,8 +83,6 @@ import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessorFactory;
|
||||||
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
|
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
|
||||||
import org.apache.druid.query.DruidProcessingConfig;
|
import org.apache.druid.query.DruidProcessingConfig;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -195,10 +190,9 @@ public class MSQIndexingModule implements DruidModule
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
@LazySingleton
|
@LazySingleton
|
||||||
public Bouncer makeBouncer(final DruidProcessingConfig processingConfig, Injector injector)
|
public Bouncer makeBouncer(final DruidProcessingConfig processingConfig, @Self Set<NodeRole> nodeRoles)
|
||||||
{
|
{
|
||||||
Set<NodeRole> nodeRoles = getNodeRoles(injector);
|
if (nodeRoles.contains(NodeRole.PEON) && !nodeRoles.contains(NodeRole.INDEXER)) {
|
||||||
if (null == nodeRoles || (nodeRoles.contains(NodeRole.PEON) && !nodeRoles.contains(NodeRole.INDEXER))) {
|
|
||||||
// CliPeon -> use only one thread regardless of configured # of processing threads. This matches the expected
|
// CliPeon -> use only one thread regardless of configured # of processing threads. This matches the expected
|
||||||
// resource usage pattern for CliPeon-based tasks (one task / one working thread per JVM).
|
// resource usage pattern for CliPeon-based tasks (one task / one working thread per JVM).
|
||||||
return new Bouncer(1);
|
return new Bouncer(1);
|
||||||
|
@ -206,22 +200,4 @@ public class MSQIndexingModule implements DruidModule
|
||||||
return new Bouncer(processingConfig.getNumThreads());
|
return new Bouncer(processingConfig.getNumThreads());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
|
||||||
private static Set<NodeRole> getNodeRoles(Injector injector)
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
return injector.getInstance(
|
|
||||||
Key.get(
|
|
||||||
new TypeLiteral<Set<NodeRole>>()
|
|
||||||
{
|
|
||||||
},
|
|
||||||
Self.class
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import com.google.common.collect.Iterables;
|
||||||
import com.google.common.io.ByteStreams;
|
import com.google.common.io.ByteStreams;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import com.google.inject.Key;
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Module;
|
||||||
import com.google.inject.TypeLiteral;
|
import com.google.inject.TypeLiteral;
|
||||||
import com.google.inject.util.Modules;
|
import com.google.inject.util.Modules;
|
||||||
import com.google.inject.util.Providers;
|
import com.google.inject.util.Providers;
|
||||||
|
@ -45,6 +46,7 @@ import org.apache.druid.guice.GuiceInjectors;
|
||||||
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
|
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
|
||||||
import org.apache.druid.guice.JoinableFactoryModule;
|
import org.apache.druid.guice.JoinableFactoryModule;
|
||||||
import org.apache.druid.guice.JsonConfigProvider;
|
import org.apache.druid.guice.JsonConfigProvider;
|
||||||
|
import org.apache.druid.guice.StartupInjectorBuilder;
|
||||||
import org.apache.druid.guice.annotations.EscalatedGlobal;
|
import org.apache.druid.guice.annotations.EscalatedGlobal;
|
||||||
import org.apache.druid.guice.annotations.MSQ;
|
import org.apache.druid.guice.annotations.MSQ;
|
||||||
import org.apache.druid.guice.annotations.Self;
|
import org.apache.druid.guice.annotations.Self;
|
||||||
|
@ -53,6 +55,7 @@ import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
|
||||||
import org.apache.druid.indexing.common.task.CompactionTask;
|
import org.apache.druid.indexing.common.task.CompactionTask;
|
||||||
import org.apache.druid.indexing.common.task.IndexTask;
|
import org.apache.druid.indexing.common.task.IndexTask;
|
||||||
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
|
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
|
||||||
|
import org.apache.druid.initialization.CoreInjectorBuilder;
|
||||||
import org.apache.druid.java.util.common.IOE;
|
import org.apache.druid.java.util.common.IOE;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.Pair;
|
import org.apache.druid.java.util.common.Pair;
|
||||||
|
@ -278,7 +281,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
|
||||||
|
|
||||||
segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO);
|
segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO);
|
||||||
|
|
||||||
Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(
|
List<Module> modules = ImmutableList.of(
|
||||||
binder -> {
|
binder -> {
|
||||||
DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig()
|
DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig()
|
||||||
{
|
{
|
||||||
|
@ -377,7 +380,11 @@ public class MSQTestBase extends BaseCalciteQueryTest
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
new MSQExternalDataSourceModule()
|
new MSQExternalDataSourceModule()
|
||||||
));
|
);
|
||||||
|
// adding node role injection to the modules, since CliPeon would also do that through run method
|
||||||
|
Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build(), ImmutableSet.of(NodeRole.PEON))
|
||||||
|
.addAll(modules)
|
||||||
|
.build();
|
||||||
|
|
||||||
objectMapper = setupObjectMapper(injector);
|
objectMapper = setupObjectMapper(injector);
|
||||||
objectMapper.registerModules(sqlModule.getJacksonModules());
|
objectMapper.registerModules(sqlModule.getJacksonModules());
|
||||||
|
|
|
@ -23,11 +23,16 @@ import com.fasterxml.jackson.databind.InjectableValues;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Inject;
|
||||||
import org.apache.druid.data.input.MapBasedInputRow;
|
import org.apache.druid.data.input.MapBasedInputRow;
|
||||||
|
import org.apache.druid.discovery.NodeRole;
|
||||||
|
import org.apache.druid.guice.annotations.Self;
|
||||||
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
|
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
|
||||||
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
|
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
|
||||||
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||||
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
|
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
|
||||||
|
import org.apache.druid.initialization.DruidModule;
|
||||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
|
@ -47,14 +52,32 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class HadoopDruidIndexerConfigTest
|
public class HadoopDruidIndexerConfigTest
|
||||||
{
|
{
|
||||||
private static final ObjectMapper JSON_MAPPER;
|
private static final ObjectMapper JSON_MAPPER;
|
||||||
|
// testing member to verify that DruidModule gets node-roles set supplied through the static initialization of
|
||||||
|
// HadoopDruidIndexerConfig class.
|
||||||
|
private static final DruidModule MY_MODULE;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
JSON_MAPPER = new DefaultObjectMapper();
|
JSON_MAPPER = new DefaultObjectMapper();
|
||||||
JSON_MAPPER.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, JSON_MAPPER));
|
JSON_MAPPER.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, JSON_MAPPER));
|
||||||
|
MY_MODULE = new DruidModule()
|
||||||
|
{
|
||||||
|
@Inject
|
||||||
|
public void setNodeRoles(@Self Set<NodeRole> nodeRoles)
|
||||||
|
{
|
||||||
|
Assert.assertTrue(nodeRoles.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -22,12 +22,14 @@ package org.apache.druid.indexing.common.task;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import org.apache.druid.guice.ExtensionsConfig;
|
import org.apache.druid.guice.ExtensionsConfig;
|
||||||
import org.apache.druid.guice.ExtensionsLoader;
|
import org.apache.druid.guice.ExtensionsLoader;
|
||||||
import org.apache.druid.guice.StartupInjectorBuilder;
|
import org.apache.druid.guice.StartupInjectorBuilder;
|
||||||
import org.apache.druid.indexing.common.TaskToolbox;
|
import org.apache.druid.indexing.common.TaskToolbox;
|
||||||
|
import org.apache.druid.initialization.ServerInjectorBuilder;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.utils.JvmUtils;
|
import org.apache.druid.utils.JvmUtils;
|
||||||
|
|
||||||
|
@ -50,7 +52,10 @@ public abstract class HadoopTask extends AbstractBatchIndexTask
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(HadoopTask.class);
|
private static final Logger log = new Logger(HadoopTask.class);
|
||||||
|
|
||||||
static final Injector INJECTOR = new StartupInjectorBuilder().forServer().build();
|
static final Injector INJECTOR = new StartupInjectorBuilder()
|
||||||
|
.forServer()
|
||||||
|
.add(ServerInjectorBuilder.registerNodeRoleModule(ImmutableSet.of()))
|
||||||
|
.build();
|
||||||
private static final ExtensionsLoader EXTENSIONS_LOADER = ExtensionsLoader.instance(INJECTOR);
|
private static final ExtensionsLoader EXTENSIONS_LOADER = ExtensionsLoader.instance(INJECTOR);
|
||||||
|
|
||||||
private final List<String> hadoopDependencyCoordinates;
|
private final List<String> hadoopDependencyCoordinates;
|
||||||
|
|
|
@ -270,6 +270,10 @@
|
||||||
<groupId>com.google.inject</groupId>
|
<groupId>com.google.inject</groupId>
|
||||||
<artifactId>guice</artifactId>
|
<artifactId>guice</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.inject.extensions</groupId>
|
||||||
|
<artifactId>guice-multibindings</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
<artifactId>jackson-databind</artifactId>
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
|
|
@ -21,11 +21,10 @@ package org.apache.druid.testing.guice;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
import com.google.inject.TypeLiteral;
|
import com.google.inject.multibindings.Multibinder;
|
||||||
import org.apache.druid.curator.CuratorConfig;
|
import org.apache.druid.curator.CuratorConfig;
|
||||||
import org.apache.druid.discovery.NodeRole;
|
import org.apache.druid.discovery.NodeRole;
|
||||||
import org.apache.druid.guice.JsonConfigProvider;
|
import org.apache.druid.guice.JsonConfigProvider;
|
||||||
|
@ -44,8 +43,6 @@ import org.apache.druid.testing.IntegrationTestingConfig;
|
||||||
import org.apache.druid.testing.IntegrationTestingConfigProvider;
|
import org.apache.druid.testing.IntegrationTestingConfigProvider;
|
||||||
import org.apache.druid.testing.IntegrationTestingCuratorConfig;
|
import org.apache.druid.testing.IntegrationTestingCuratorConfig;
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class DruidTestModule implements Module
|
public class DruidTestModule implements Module
|
||||||
|
@ -66,9 +63,7 @@ public class DruidTestModule implements Module
|
||||||
);
|
);
|
||||||
|
|
||||||
// Required for MSQIndexingModule
|
// Required for MSQIndexingModule
|
||||||
binder.bind(new TypeLiteral<Set<NodeRole>>()
|
Multibinder.newSetBinder(binder, NodeRole.class, Self.class).addBinding().toInstance(NodeRole.PEON);
|
||||||
{
|
|
||||||
}).annotatedWith(Self.class).toInstance(ImmutableSet.of(NodeRole.PEON));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
|
|
|
@ -20,7 +20,9 @@
|
||||||
package org.apache.druid.initialization;
|
package org.apache.druid.initialization;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
|
@ -45,7 +47,7 @@ import java.util.Set;
|
||||||
*/
|
*/
|
||||||
public class ServerInjectorBuilder
|
public class ServerInjectorBuilder
|
||||||
{
|
{
|
||||||
private Injector baseInjector;
|
private final Injector baseInjector;
|
||||||
private Set<NodeRole> nodeRoles;
|
private Set<NodeRole> nodeRoles;
|
||||||
private Iterable<? extends Module> modules;
|
private Iterable<? extends Module> modules;
|
||||||
|
|
||||||
|
@ -78,7 +80,7 @@ public class ServerInjectorBuilder
|
||||||
|
|
||||||
public ServerInjectorBuilder nodeRoles(final Set<NodeRole> nodeRoles)
|
public ServerInjectorBuilder nodeRoles(final Set<NodeRole> nodeRoles)
|
||||||
{
|
{
|
||||||
this.nodeRoles = nodeRoles;
|
this.nodeRoles = nodeRoles == null ? ImmutableSet.of() : nodeRoles;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,6 +92,9 @@ public class ServerInjectorBuilder
|
||||||
|
|
||||||
public Injector build()
|
public Injector build()
|
||||||
{
|
{
|
||||||
|
Preconditions.checkNotNull(baseInjector);
|
||||||
|
Preconditions.checkNotNull(nodeRoles);
|
||||||
|
|
||||||
Module registerNodeRoleModule = registerNodeRoleModule(nodeRoles);
|
Module registerNodeRoleModule = registerNodeRoleModule(nodeRoles);
|
||||||
|
|
||||||
// Child injector, with the registered node roles
|
// Child injector, with the registered node roles
|
||||||
|
@ -115,9 +120,6 @@ public class ServerInjectorBuilder
|
||||||
|
|
||||||
public static Module registerNodeRoleModule(Set<NodeRole> nodeRoles)
|
public static Module registerNodeRoleModule(Set<NodeRole> nodeRoles)
|
||||||
{
|
{
|
||||||
if (nodeRoles.isEmpty()) {
|
|
||||||
return binder -> {};
|
|
||||||
}
|
|
||||||
return binder -> {
|
return binder -> {
|
||||||
Multibinder<NodeRole> selfBinder = Multibinder.newSetBinder(binder, NodeRole.class, Self.class);
|
Multibinder<NodeRole> selfBinder = Multibinder.newSetBinder(binder, NodeRole.class, Self.class);
|
||||||
nodeRoles.forEach(nodeRole -> selfBinder.addBinding().toInstance(nodeRole));
|
nodeRoles.forEach(nodeRole -> selfBinder.addBinding().toInstance(nodeRole));
|
||||||
|
|
|
@ -22,11 +22,11 @@ package org.apache.druid.server.metrics;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import com.google.inject.Key;
|
import com.google.inject.Key;
|
||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
import com.google.inject.TypeLiteral;
|
|
||||||
import com.google.inject.name.Names;
|
import com.google.inject.name.Names;
|
||||||
import io.timeandspace.cronscheduler.CronScheduler;
|
import io.timeandspace.cronscheduler.CronScheduler;
|
||||||
import org.apache.druid.discovery.NodeRole;
|
import org.apache.druid.discovery.NodeRole;
|
||||||
|
@ -50,7 +50,6 @@ import org.apache.druid.java.util.metrics.NoopSysMonitor;
|
||||||
import org.apache.druid.java.util.metrics.SysMonitor;
|
import org.apache.druid.java.util.metrics.SysMonitor;
|
||||||
import org.apache.druid.query.ExecutorServiceMonitor;
|
import org.apache.druid.query.ExecutorServiceMonitor;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -66,6 +65,13 @@ public class MetricsModule implements Module
|
||||||
{
|
{
|
||||||
static final String MONITORING_PROPERTY_PREFIX = "druid.monitoring";
|
static final String MONITORING_PROPERTY_PREFIX = "druid.monitoring";
|
||||||
private static final Logger log = new Logger(MetricsModule.class);
|
private static final Logger log = new Logger(MetricsModule.class);
|
||||||
|
private Set<NodeRole> nodeRoles;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public void setNodeRoles(@Self Set<NodeRole> nodeRoles)
|
||||||
|
{
|
||||||
|
this.nodeRoles = nodeRoles;
|
||||||
|
}
|
||||||
|
|
||||||
public static void register(Binder binder, Class<? extends Monitor> monitorClazz)
|
public static void register(Binder binder, Class<? extends Monitor> monitorClazz)
|
||||||
{
|
{
|
||||||
|
@ -174,14 +180,9 @@ public class MetricsModule implements Module
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
@ManageLifecycle
|
@ManageLifecycle
|
||||||
public SysMonitor getSysMonitor(
|
public SysMonitor getSysMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder, @Self Set<NodeRole> nodeRoles)
|
||||||
DataSourceTaskIdHolder dataSourceTaskIdHolder,
|
|
||||||
Injector injector
|
|
||||||
)
|
|
||||||
{
|
{
|
||||||
final Set<NodeRole> nodeRoles = getNodeRoles(injector);
|
if (nodeRoles.contains(NodeRole.PEON)) {
|
||||||
|
|
||||||
if (isPeonRole(nodeRoles)) {
|
|
||||||
return new NoopSysMonitor();
|
return new NoopSysMonitor();
|
||||||
} else {
|
} else {
|
||||||
Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
|
Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
|
||||||
|
@ -191,30 +192,4 @@ public class MetricsModule implements Module
|
||||||
return new SysMonitor(dimensions);
|
return new SysMonitor(dimensions);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
|
||||||
private static Set<NodeRole> getNodeRoles(Injector injector)
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
return injector.getInstance(
|
|
||||||
Key.get(
|
|
||||||
new TypeLiteral<Set<NodeRole>>()
|
|
||||||
{
|
|
||||||
},
|
|
||||||
Self.class
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isPeonRole(Set<NodeRole> nodeRoles)
|
|
||||||
{
|
|
||||||
if (nodeRoles == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return nodeRoles.contains(NodeRole.PEON);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,6 +154,39 @@ public class ServerInjectorBuilderTest
|
||||||
Assert.assertEquals(expected, injector.getInstance(Key.get(DruidNode.class, Self.class)));
|
Assert.assertEquals(expected, injector.getInstance(Key.get(DruidNode.class, Self.class)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateInjectorWithEmptyNodeRolesAndRoleInjection()
|
||||||
|
{
|
||||||
|
final DruidNode expected = new DruidNode("test-inject", null, false, null, null, true, false);
|
||||||
|
Injector startupInjector = startupInjector();
|
||||||
|
Injector injector = ServerInjectorBuilder.makeServerInjector(
|
||||||
|
startupInjector,
|
||||||
|
ImmutableSet.of(),
|
||||||
|
ImmutableList.of(
|
||||||
|
(com.google.inject.Module) new DruidModule()
|
||||||
|
{
|
||||||
|
@Inject
|
||||||
|
public void setNodeRoles(@Self Set<NodeRole> nodeRoles)
|
||||||
|
{
|
||||||
|
Assert.assertTrue(nodeRoles.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
},
|
||||||
|
binder -> JsonConfigProvider.bindInstance(
|
||||||
|
binder,
|
||||||
|
Key.get(DruidNode.class, Self.class),
|
||||||
|
expected
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Assert.assertNotNull(injector);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateInjectorWithNodeRoleFilter_moduleNotLoaded()
|
public void testCreateInjectorWithNodeRoleFilter_moduleNotLoaded()
|
||||||
{
|
{
|
||||||
|
|
|
@ -28,7 +28,6 @@ import com.google.inject.Injector;
|
||||||
import com.google.inject.Key;
|
import com.google.inject.Key;
|
||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
import com.google.inject.Scopes;
|
import com.google.inject.Scopes;
|
||||||
import com.google.inject.TypeLiteral;
|
|
||||||
import com.google.inject.name.Names;
|
import com.google.inject.name.Names;
|
||||||
import org.apache.druid.discovery.NodeRole;
|
import org.apache.druid.discovery.NodeRole;
|
||||||
import org.apache.druid.guice.GuiceInjectors;
|
import org.apache.druid.guice.GuiceInjectors;
|
||||||
|
@ -37,6 +36,7 @@ import org.apache.druid.guice.LazySingleton;
|
||||||
import org.apache.druid.guice.LifecycleModule;
|
import org.apache.druid.guice.LifecycleModule;
|
||||||
import org.apache.druid.guice.annotations.Self;
|
import org.apache.druid.guice.annotations.Self;
|
||||||
import org.apache.druid.initialization.Initialization;
|
import org.apache.druid.initialization.Initialization;
|
||||||
|
import org.apache.druid.initialization.ServerInjectorBuilder;
|
||||||
import org.apache.druid.jackson.JacksonModule;
|
import org.apache.druid.jackson.JacksonModule;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
|
@ -59,7 +59,6 @@ import org.mockito.Mockito;
|
||||||
import javax.validation.Validation;
|
import javax.validation.Validation;
|
||||||
import javax.validation.Validator;
|
import javax.validation.Validator;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
public class MetricsModuleTest
|
public class MetricsModuleTest
|
||||||
{
|
{
|
||||||
|
@ -125,7 +124,8 @@ public class MetricsModuleTest
|
||||||
@Test
|
@Test
|
||||||
public void testGetBasicMonitorSchedulerByDefault()
|
public void testGetBasicMonitorSchedulerByDefault()
|
||||||
{
|
{
|
||||||
final MonitorScheduler monitorScheduler = createInjector(new Properties()).getInstance(MonitorScheduler.class);
|
final MonitorScheduler monitorScheduler =
|
||||||
|
createInjector(new Properties(), ImmutableSet.of()).getInstance(MonitorScheduler.class);
|
||||||
Assert.assertSame(BasicMonitorScheduler.class, monitorScheduler.getClass());
|
Assert.assertSame(BasicMonitorScheduler.class, monitorScheduler.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +137,8 @@ public class MetricsModuleTest
|
||||||
StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX),
|
StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX),
|
||||||
ClockDriftSafeMonitorScheduler.class.getName()
|
ClockDriftSafeMonitorScheduler.class.getName()
|
||||||
);
|
);
|
||||||
final MonitorScheduler monitorScheduler = createInjector(properties).getInstance(MonitorScheduler.class);
|
final MonitorScheduler monitorScheduler =
|
||||||
|
createInjector(properties, ImmutableSet.of()).getInstance(MonitorScheduler.class);
|
||||||
Assert.assertSame(ClockDriftSafeMonitorScheduler.class, monitorScheduler.getClass());
|
Assert.assertSame(ClockDriftSafeMonitorScheduler.class, monitorScheduler.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,7 +150,8 @@ public class MetricsModuleTest
|
||||||
StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX),
|
StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX),
|
||||||
BasicMonitorScheduler.class.getName()
|
BasicMonitorScheduler.class.getName()
|
||||||
);
|
);
|
||||||
final MonitorScheduler monitorScheduler = createInjector(properties).getInstance(MonitorScheduler.class);
|
final MonitorScheduler monitorScheduler =
|
||||||
|
createInjector(properties, ImmutableSet.of()).getInstance(MonitorScheduler.class);
|
||||||
Assert.assertSame(BasicMonitorScheduler.class, monitorScheduler.getClass());
|
Assert.assertSame(BasicMonitorScheduler.class, monitorScheduler.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +166,7 @@ public class MetricsModuleTest
|
||||||
expectedException.expect(CreationException.class);
|
expectedException.expect(CreationException.class);
|
||||||
expectedException.expectCause(CoreMatchers.instanceOf(IllegalArgumentException.class));
|
expectedException.expectCause(CoreMatchers.instanceOf(IllegalArgumentException.class));
|
||||||
expectedException.expectMessage("Unknown monitor scheduler[UnknownScheduler]");
|
expectedException.expectMessage("Unknown monitor scheduler[UnknownScheduler]");
|
||||||
createInjector(properties).getInstance(MonitorScheduler.class);
|
createInjector(properties, ImmutableSet.of()).getInstance(MonitorScheduler.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -173,24 +175,8 @@ public class MetricsModuleTest
|
||||||
// Do not run the tests on ARM64. Sigar library has no binaries for ARM64
|
// Do not run the tests on ARM64. Sigar library has no binaries for ARM64
|
||||||
Assume.assumeFalse("aarch64".equals(CPU_ARCH));
|
Assume.assumeFalse("aarch64".equals(CPU_ARCH));
|
||||||
|
|
||||||
final NodeRole nodeRole = NodeRole.PEON;
|
final Injector injector = createInjector(new Properties(), ImmutableSet.of(NodeRole.PEON));
|
||||||
final Injector injector = Guice.createInjector(
|
final SysMonitor sysMonitor = injector.getInstance(SysMonitor.class);
|
||||||
new JacksonModule(),
|
|
||||||
new LifecycleModule(),
|
|
||||||
binder -> {
|
|
||||||
binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
|
|
||||||
},
|
|
||||||
binder -> {
|
|
||||||
binder.bind(
|
|
||||||
new TypeLiteral<Set<NodeRole>>()
|
|
||||||
{
|
|
||||||
}).annotatedWith(Self.class).toInstance(ImmutableSet.of(nodeRole));
|
|
||||||
}
|
|
||||||
);
|
|
||||||
final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder();
|
|
||||||
injector.injectMembers(dimensionIdHolder);
|
|
||||||
final MetricsModule metricsModule = new MetricsModule();
|
|
||||||
final SysMonitor sysMonitor = metricsModule.getSysMonitor(dimensionIdHolder, injector);
|
|
||||||
final ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
|
final ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
|
||||||
sysMonitor.doMonitor(emitter);
|
sysMonitor.doMonitor(emitter);
|
||||||
|
|
||||||
|
@ -204,11 +190,8 @@ public class MetricsModuleTest
|
||||||
// Do not run the tests on ARM64. Sigar library has no binaries for ARM64
|
// Do not run the tests on ARM64. Sigar library has no binaries for ARM64
|
||||||
Assume.assumeFalse("aarch64".equals(CPU_ARCH));
|
Assume.assumeFalse("aarch64".equals(CPU_ARCH));
|
||||||
|
|
||||||
final Injector injector = createInjector(new Properties());
|
Injector injector = createInjector(new Properties(), ImmutableSet.of());
|
||||||
final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder();
|
final SysMonitor sysMonitor = injector.getInstance(SysMonitor.class);
|
||||||
injector.injectMembers(dimensionIdHolder);
|
|
||||||
final MetricsModule metricsModule = new MetricsModule();
|
|
||||||
final SysMonitor sysMonitor = metricsModule.getSysMonitor(dimensionIdHolder, injector);
|
|
||||||
final ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
|
final ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
|
||||||
sysMonitor.doMonitor(emitter);
|
sysMonitor.doMonitor(emitter);
|
||||||
|
|
||||||
|
@ -216,7 +199,7 @@ public class MetricsModuleTest
|
||||||
Mockito.verify(emitter, Mockito.atLeastOnce()).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
|
Mockito.verify(emitter, Mockito.atLeastOnce()).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Injector createInjector(Properties properties)
|
private static Injector createInjector(Properties properties, ImmutableSet<NodeRole> nodeRoles)
|
||||||
{
|
{
|
||||||
return Guice.createInjector(
|
return Guice.createInjector(
|
||||||
new JacksonModule(),
|
new JacksonModule(),
|
||||||
|
@ -227,6 +210,7 @@ public class MetricsModuleTest
|
||||||
binder.bind(ServiceEmitter.class).toInstance(new NoopServiceEmitter());
|
binder.bind(ServiceEmitter.class).toInstance(new NoopServiceEmitter());
|
||||||
binder.bind(Properties.class).toInstance(properties);
|
binder.bind(Properties.class).toInstance(properties);
|
||||||
},
|
},
|
||||||
|
ServerInjectorBuilder.registerNodeRoleModule(nodeRoles),
|
||||||
new MetricsModule()
|
new MetricsModule()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue