Merge pull request elastic/elasticsearch#2934 from rjernst/deguice15

Internal: Remove guice from monitoring

Original commit: elastic/x-pack-elasticsearch@0366cbedd6
This commit is contained in:
Ryan Ernst 2016-07-26 15:07:51 -07:00 committed by GitHub
commit a235c27d80
36 changed files with 279 additions and 518 deletions

View File

@ -45,10 +45,6 @@ public class Licensing implements ActionPlugin {
isTribeNode = isTribeNode(settings);
}
public Collection<Module> nodeModules() {
return Collections.emptyList();
}
@Override
public List<ActionHandler<? extends ActionRequest<?>, ? extends ActionResponse>> getActions() {
if (isTribeNode) {
@ -69,14 +65,6 @@ public class Licensing implements ActionPlugin {
RestDeleteLicenseAction.class);
}
public Collection<Object> createComponents(ClusterService clusterService, Clock clock, Environment environment,
ResourceWatcherService resourceWatcherService,
XPackLicenseState licenseState) {
LicenseService licenseService = new LicenseService(settings, clusterService, clock,
environment, resourceWatcherService, licenseState);
return Arrays.asList(licenseService, licenseState);
}
public List<Setting<?>> getSettings() {
// TODO convert this wildcard to a real setting
return Collections.singletonList(Setting.groupSetting("license.", Setting.Property.NodeScope));

View File

@ -7,28 +7,46 @@ package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.monitoring.action.MonitoringBulkAction;
import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction;
import org.elasticsearch.xpack.monitoring.agent.AgentService;
import org.elasticsearch.xpack.monitoring.agent.collector.CollectorModule;
import org.elasticsearch.xpack.monitoring.agent.exporter.ExporterModule;
import org.elasticsearch.xpack.monitoring.agent.collector.Collector;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndicesStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.node.NodeStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.shards.ShardsCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporters;
import org.elasticsearch.xpack.monitoring.agent.exporter.http.HttpExporter;
import org.elasticsearch.xpack.monitoring.agent.exporter.local.LocalExporter;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.client.MonitoringClientModule;
import org.elasticsearch.xpack.monitoring.rest.action.RestMonitoringBulkAction;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@ -44,12 +62,16 @@ public class Monitoring implements ActionPlugin {
public static final String NAME = "monitoring";
private final Settings settings;
private final Environment env;
private final XPackLicenseState licenseState;
private final boolean enabled;
private final boolean transportClientMode;
private final boolean tribeNode;
public Monitoring(Settings settings) {
public Monitoring(Settings settings, Environment env, XPackLicenseState licenseState) {
this.settings = settings;
this.env = env;
this.licenseState = licenseState;
this.enabled = enabled(settings);
this.transportClientMode = XPackPlugin.transportClientMode(settings);
this.tribeNode = XPackPlugin.isTribeNode(settings);
@ -65,20 +87,41 @@ public class Monitoring implements ActionPlugin {
public Collection<Module> nodeModules() {
List<Module> modules = new ArrayList<>();
modules.add(new MonitoringModule(enabled, transportClientMode));
modules.add(new ExporterModule(settings));
if (enabled && transportClientMode == false && tribeNode == false) {
modules.add(new CollectorModule());
modules.add(new MonitoringClientModule());
}
modules.add(b -> {
XPackPlugin.bindFeatureSet(b, MonitoringFeatureSet.class);
if (transportClientMode || enabled == false || tribeNode) {
b.bind(Exporters.class).toProvider(Providers.of(null));
}
});
return modules;
}
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (enabled == false || transportClientMode || tribeNode) {
public Collection<Object> createComponents(InternalClient client, ThreadPool threadPool, ClusterService clusterService,
LicenseService licenseService) {
if (enabled == false || tribeNode) {
return Collections.emptyList();
}
return Arrays.<Class<? extends LifecycleComponent>>asList(AgentService.class, CleanerService.class);
final ClusterSettings clusterSettings = clusterService.getClusterSettings();
final MonitoringSettings monitoringSettings = new MonitoringSettings(settings, clusterSettings);
final CleanerService cleanerService = new CleanerService(settings, clusterSettings, threadPool, licenseState);
Map<String, Exporter.Factory> exporterFactories = new HashMap<>();
exporterFactories.put(HttpExporter.TYPE, config -> new HttpExporter(config, env));
exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService, cleanerService));
final Exporters exporters = new Exporters(settings, exporterFactories, clusterService);
Set<Collector> collectors = new HashSet<>();
collectors.add(new IndicesStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new IndexStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new ClusterStatsCollector(settings, clusterService, monitoringSettings, licenseState, client, licenseService));
collectors.add(new ClusterStateCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new ShardsCollector(settings, clusterService, monitoringSettings, licenseState));
collectors.add(new NodeStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new IndexRecoveryCollector(settings, clusterService, monitoringSettings, licenseState, client));
final AgentService agentService = new AgentService(settings, clusterSettings, collectors, exporters);
return Arrays.asList(agentService, monitoringSettings, exporters, cleanerService);
}
@Override

View File

@ -71,7 +71,7 @@ public class MonitoringFeatureSet implements XPackFeatureSet {
Map<String, Object> usage = new HashMap<>();
for (Exporter exporter : exporters) {
if (exporter.config().enabled()) {
String type = exporter.type();
String type = exporter.config().type();
int count = (Integer) usage.getOrDefault(type, 0);
usage.put(type, count + 1);
}

View File

@ -1,34 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.monitoring.agent.AgentService;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
public class MonitoringModule extends AbstractModule {
private final boolean enabled;
private final boolean transportClientMode;
public MonitoringModule(boolean enabled, boolean transportClientMode) {
this.enabled = enabled;
this.transportClientMode = transportClientMode;
}
@Override
protected void configure() {
XPackPlugin.bindFeatureSet(binder(), MonitoringFeatureSet.class);
if (enabled && transportClientMode == false) {
bind(MonitoringSettings.class).asEagerSingleton();
bind(AgentService.class).asEagerSingleton();
bind(CleanerService.class).asEagerSingleton();
}
}
}

View File

@ -34,7 +34,6 @@ public class MonitoringSettings extends AbstractComponent {
* The minimum amount of time allowed for the history duration.
*/
public static final TimeValue HISTORY_DURATION_MINIMUM = TimeValue.timeValueHours(24);
public static final TimeValue MAX_LICENSE_GRACE_PERIOD = TimeValue.timeValueHours(7 * 24);
/**
* Determines whether monitoring is enabled/disabled
@ -153,7 +152,6 @@ public class MonitoringSettings extends AbstractComponent {
private volatile boolean recoveryActiveOnly;
private volatile String[] indices;
@Inject
public MonitoringSettings(Settings settings, ClusterSettings clusterSettings) {
super(settings);

View File

@ -51,7 +51,6 @@ public class AgentService extends AbstractLifecycleComponent {
private final String[] settingsCollectors;
private final Exporters exporters;
@Inject
public AgentService(Settings settings, ClusterSettings clusterSettings, Set<Collector> collectors, Exporters exporters) {
super(settings);
this.samplingIntervalMillis = MonitoringSettings.INTERVAL.get(settings).millis();

View File

@ -27,7 +27,6 @@ public abstract class AbstractCollector extends AbstractLifecycleComponent imple
protected final MonitoringSettings monitoringSettings;
protected final XPackLicenseState licenseState;
@Inject
public AbstractCollector(Settings settings, String name, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState) {
super(settings);

View File

@ -1,48 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.agent.collector;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndicesStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.node.NodeStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.shards.ShardsCollector;
import java.util.HashSet;
import java.util.Set;
public class CollectorModule extends AbstractModule {
private final Set<Class<? extends Collector>> collectors = new HashSet<>();
public CollectorModule() {
// Registers default collectors
registerCollector(IndicesStatsCollector.class);
registerCollector(IndexStatsCollector.class);
registerCollector(ClusterStatsCollector.class);
registerCollector(ClusterStateCollector.class);
registerCollector(ShardsCollector.class);
registerCollector(NodeStatsCollector.class);
registerCollector(IndexRecoveryCollector.class);
}
@Override
protected void configure() {
Multibinder<Collector> binder = Multibinder.newSetBinder(binder(), Collector.class);
for (Class<? extends Collector> collector : collectors) {
bind(collector).asEagerSingleton();
binder.addBinding().to(collector);
}
}
public void registerCollector(Class<? extends Collector> collector) {
collectors.add(collector);
}
}

View File

@ -5,13 +5,17 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.cluster;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
@ -19,11 +23,6 @@ import org.elasticsearch.xpack.monitoring.agent.collector.AbstractCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for cluster state.
* <p>
@ -36,7 +35,6 @@ public class ClusterStateCollector extends AbstractCollector {
private final Client client;
@Inject
public ClusterStateCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -5,13 +5,17 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.cluster;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.LicenseUtils;
@ -21,11 +25,6 @@ import org.elasticsearch.xpack.monitoring.agent.collector.AbstractCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for cluster stats.
* <p>
@ -43,7 +42,6 @@ public class ClusterStatsCollector extends AbstractCollector {
private final LicenseService licenseService;
private final Client client;
@Inject
public ClusterStatsCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client,
LicenseService licenseService) {

View File

@ -5,12 +5,17 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.indices;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.XPackLicenseState;
@ -20,12 +25,6 @@ import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for the Recovery API.
* <p>
@ -38,7 +37,6 @@ public class IndexRecoveryCollector extends AbstractCollector {
private final Client client;
@Inject
public IndexRecoveryCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -5,6 +5,12 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.indices;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.IndicesOptions;
@ -12,7 +18,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.XPackLicenseState;
@ -22,12 +27,6 @@ import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for indices statistics.
* <p>
@ -40,7 +39,6 @@ public class IndexStatsCollector extends AbstractCollector {
private final Client client;
@Inject
public IndexStatsCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -5,12 +5,15 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.indices;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.XPackLicenseState;
@ -20,10 +23,6 @@ import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.Security;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
/**
* Collector for indices statistics.
* <p>
@ -35,7 +34,6 @@ public class IndicesStatsCollector extends AbstractCollector {
private final Client client;
@Inject
public IndicesStatsCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.node;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
@ -12,20 +16,14 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.bootstrap.BootstrapInfo;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.collector.AbstractCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.Collection;
import java.util.Collections;
/**
* Collector for nodes statistics.
* <p>
@ -37,29 +35,11 @@ public class NodeStatsCollector extends AbstractCollector {
public static final String NAME = "node-stats-collector";
private final Client client;
private final NodeEnvironment nodeEnvironment;
private final DiskThresholdDecider diskThresholdDecider;
@Inject
public NodeStatsCollector(Settings settings, ClusterService clusterService, MonitoringSettings monitoringSettings,
XPackLicenseState licenseState, InternalClient client,
NodeEnvironment nodeEnvironment, DiskThresholdDecider diskThresholdDecider) {
XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);
this.client = client;
this.nodeEnvironment = nodeEnvironment;
this.diskThresholdDecider = diskThresholdDecider;
}
@Override
protected boolean shouldCollect() {
// In some cases, the collector starts to collect nodes stats but the
// NodeEnvironment is not fully initialized (NodePath is null) and can fail.
// This why we need to check for nodeEnvironment.hasNodeFile() here, but only
// for nodes that can hold data. Client nodes can collect nodes stats because
// elasticsearch correctly handles the nodes stats for client nodes.
return super.shouldCollect()
&& (DiscoveryNode.nodeRequiresLocalStorage(settings) == false || nodeEnvironment.hasNodeFile());
}
@Override
@ -80,12 +60,6 @@ public class NodeStatsCollector extends AbstractCollector {
}
NodeStats nodeStats = response.getNodes().get(0);
// Here we are calling directly the DiskThresholdDecider to retrieve the high watermark value
// It would be nicer to use a settings API like documented in #6732
Double diskThresholdWatermarkHigh = (diskThresholdDecider != null) ? 100.0 - diskThresholdDecider.getFreeDiskThresholdHigh() : -1;
boolean diskThresholdDeciderEnabled = (diskThresholdDecider != null) && diskThresholdDecider.isEnabled();
DiscoveryNode sourceNode = localNode();
NodeStatsMonitoringDoc nodeStatsDoc = new NodeStatsMonitoringDoc(monitoringId(), monitoringVersion());
@ -96,8 +70,6 @@ public class NodeStatsCollector extends AbstractCollector {
nodeStatsDoc.setNodeMaster(isLocalNodeMaster());
nodeStatsDoc.setNodeStats(nodeStats);
nodeStatsDoc.setMlockall(BootstrapInfo.isMemoryLocked());
nodeStatsDoc.setDiskThresholdWaterMarkHigh(diskThresholdWatermarkHigh);
nodeStatsDoc.setDiskThresholdDeciderEnabled(diskThresholdDeciderEnabled);
return Collections.singletonList(nodeStatsDoc);
}

View File

@ -13,10 +13,7 @@ public class NodeStatsMonitoringDoc extends MonitoringDoc {
private String nodeId;
private boolean nodeMaster;
private NodeStats nodeStats;
private boolean mlockall;
private Double diskThresholdWaterMarkHigh;
private boolean diskThresholdDeciderEnabled;
public NodeStatsMonitoringDoc(String monitoringId, String monitoringVersion) {
super(monitoringId, monitoringVersion);
@ -38,14 +35,6 @@ public class NodeStatsMonitoringDoc extends MonitoringDoc {
this.mlockall = mlockall;
}
public void setDiskThresholdWaterMarkHigh(Double diskThresholdWaterMarkHigh) {
this.diskThresholdWaterMarkHigh = diskThresholdWaterMarkHigh;
}
public void setDiskThresholdDeciderEnabled(boolean diskThresholdDeciderEnabled) {
this.diskThresholdDeciderEnabled = diskThresholdDeciderEnabled;
}
public String getNodeId() {
return nodeId;
}
@ -61,13 +50,5 @@ public class NodeStatsMonitoringDoc extends MonitoringDoc {
public boolean isMlockall() {
return mlockall;
}
public Double getDiskThresholdWaterMarkHigh() {
return diskThresholdWaterMarkHigh;
}
public boolean isDiskThresholdDeciderEnabled() {
return diskThresholdDeciderEnabled;
}
}

View File

@ -5,12 +5,17 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.shards;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
@ -18,12 +23,6 @@ import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.collector.AbstractCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for shards.
* <p>
@ -34,7 +33,6 @@ public class ShardsCollector extends AbstractCollector {
public static final String NAME = "shards-collector";
@Inject
public ShardsCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -19,7 +19,6 @@ public abstract class Exporter implements AutoCloseable {
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
protected final String type;
protected final Config config;
protected final ESLogger logger;
@ -27,17 +26,12 @@ public abstract class Exporter implements AutoCloseable {
private AtomicBoolean closed = new AtomicBoolean(false);
public Exporter(String type, Config config) {
this.type = type;
public Exporter(Config config) {
this.config = config;
this.logger = config.logger(getClass());
this.bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null);
}
public String type() {
return type;
}
public String name() {
return config.name;
}
@ -50,6 +44,11 @@ public abstract class Exporter implements AutoCloseable {
return false;
}
/** Returns true if only one instance of this exporter should be allowed. */
public boolean isSingleton() {
return false;
}
/**
* Opens up a new export bulk. May return {@code null} indicating this exporter is not ready
* yet to export the docs
@ -76,12 +75,14 @@ public abstract class Exporter implements AutoCloseable {
public static class Config {
private final String name;
private final String type;
private final boolean enabled;
private final Settings globalSettings;
private final Settings settings;
public Config(String name, Settings globalSettings, Settings settings) {
public Config(String name, String type, Settings globalSettings, Settings settings) {
this.name = name;
this.type = type;
this.globalSettings = globalSettings;
this.settings = settings;
this.enabled = settings.getAsBoolean("enabled", true);
@ -91,6 +92,10 @@ public abstract class Exporter implements AutoCloseable {
return name;
}
public String type() {
return type;
}
public boolean enabled() {
return enabled;
}
@ -104,24 +109,10 @@ public abstract class Exporter implements AutoCloseable {
}
}
public abstract static class Factory<E extends Exporter> {
/** A factory for constructing {@link Exporter} instances.*/
public interface Factory {
private final String type;
private final boolean singleton;
public Factory(String type, boolean singleton) {
this.type = type;
this.singleton = singleton;
}
public String type() {
return type;
}
public boolean singleton() {
return singleton;
}
public abstract E create(Config config);
/** Create an exporter with the given configuration. */
Exporter create(Config config);
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.agent.exporter;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.monitoring.Monitoring;
import org.elasticsearch.xpack.monitoring.agent.exporter.http.HttpExporter;
import org.elasticsearch.xpack.monitoring.agent.exporter.local.LocalExporter;
import java.util.HashMap;
import java.util.Map;
public class ExporterModule extends AbstractModule {
private final Settings settings;
private final Map<String, Class<? extends Exporter.Factory<? extends Exporter>>> exporterFactories = new HashMap<>();
public ExporterModule(Settings settings) {
this.settings = settings;
registerExporter(HttpExporter.TYPE, HttpExporter.Factory.class);
registerExporter(LocalExporter.TYPE, LocalExporter.Factory.class);
}
@Override
protected void configure() {
if (Monitoring.enabled(settings) && XPackPlugin.transportClientMode(settings) == false
&& XPackPlugin.isTribeNode(settings) == false) {
bind(Exporters.class).asEagerSingleton();
MapBinder<String, Exporter.Factory> factoryBinder = MapBinder.newMapBinder(binder(), String.class, Exporter.Factory.class);
for (Map.Entry<String, Class<? extends Exporter.Factory<? extends Exporter>>> entry : exporterFactories.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
factoryBinder.addBinding(entry.getKey()).to(entry.getValue());
}
} else {
bind(Exporters.class).toProvider(Providers.of(null));
}
}
public void registerExporter(String type, Class<? extends Exporter.Factory<? extends Exporter>> factory) {
exporterFactories.put(type, factory);
}
}

View File

@ -8,9 +8,7 @@ package org.elasticsearch.xpack.monitoring.agent.exporter;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.node.Node;
@ -39,16 +37,15 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
private final AtomicReference<Map<String, Exporter>> exporters;
@Inject
public Exporters(Settings settings, Map<String, Exporter.Factory> factories,
ClusterService clusterService,
ClusterSettings clusterSettings) {
ClusterService clusterService) {
super(settings);
this.factories = factories;
this.clusterService = clusterService;
this.exporters = new AtomicReference<>(emptyMap());
clusterSettings.addSettingsUpdateConsumer(MonitoringSettings.EXPORTERS_SETTINGS, this::setExportersSetting);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MonitoringSettings.EXPORTERS_SETTINGS,
this::setExportersSetting);
}
private void setExportersSetting(Settings exportersSetting) {
@ -135,7 +132,7 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
if (factory == null) {
throw new SettingsException("unknown exporter type [" + type + "] set for exporter [" + name + "]");
}
Exporter.Config config = new Exporter.Config(name, globalSettings, exporterSettings);
Exporter.Config config = new Exporter.Config(name, type, globalSettings, exporterSettings);
if (!config.enabled()) {
hasDisabled = true;
if (logger.isDebugEnabled()) {
@ -143,8 +140,9 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
}
continue;
}
if (factory.singleton()) {
// this is a singleton exporter factory, let's make sure we didn't already registered one
Exporter exporter = factory.create(config);
if (exporter.isSingleton()) {
// this is a singleton exporter, let's make sure we didn't already create one
// (there can only be one instance of a singleton exporter)
if (singletons.contains(type)) {
throw new SettingsException("multiple [" + type + "] exporters are configured. there can " +
@ -152,7 +150,7 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
}
singletons.add(type);
}
exporters.put(config.name(), factory.create(config));
exporters.put(config.name(), exporter);
}
// no exporters are configured, lets create a default local one.
@ -161,7 +159,8 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
// fallback on the default
//
if (exporters.isEmpty() && !hasDisabled) {
Exporter.Config config = new Exporter.Config("default_" + LocalExporter.TYPE, globalSettings, Settings.EMPTY);
Exporter.Config config = new Exporter.Config("default_" + LocalExporter.TYPE, LocalExporter.TYPE,
globalSettings, Settings.EMPTY);
exporters.put(config.name(), factories.get(LocalExporter.TYPE).create(config));
}

View File

@ -111,9 +111,8 @@ public class HttpExporter extends Exporter {
final ConnectionKeepAliveWorker keepAliveWorker;
Thread keepAliveThread;
public HttpExporter(Exporter.Config config, Environment env) {
super(TYPE, config);
public HttpExporter(Config config, Environment env) {
super(config);
this.env = env;
hosts = config.settings().getAsArray(HOST_SETTING, Strings.EMPTY_ARRAY);
@ -754,20 +753,4 @@ public class HttpExporter extends Exporter {
}
}
}
public static class Factory extends Exporter.Factory<HttpExporter> {
private final Environment env;
@Inject
public Factory(Environment env) {
super(TYPE, false);
this.env = env;
}
@Override
public HttpExporter create(Config config) {
return new HttpExporter(config, env);
}
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.monitoring.agent.exporter.ExportException;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.agent.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.agent.resolver.ResolversRegistry;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.Arrays;
import java.util.Collection;
@ -28,13 +29,13 @@ import java.util.Collection;
public class LocalBulk extends ExportBulk {
private final ESLogger logger;
private final ClientProxy client;
private final InternalClient client;
private final ResolversRegistry resolvers;
private BulkRequestBuilder requestBuilder;
public LocalBulk(String name, ESLogger logger, ClientProxy client, ResolversRegistry resolvers) {
public LocalBulk(String name, ESLogger logger, InternalClient client, ResolversRegistry resolvers) {
super(name);
this.logger = logger;
this.client = client;

View File

@ -51,16 +51,16 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
public static final String TYPE = "local";
private final ClientProxy client;
private final InternalClient client;
private final ClusterService clusterService;
private final ResolversRegistry resolvers;
private final CleanerService cleanerService;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
public LocalExporter(Exporter.Config config, ClientProxy client,
public LocalExporter(Exporter.Config config, InternalClient client,
ClusterService clusterService, CleanerService cleanerService) {
super(TYPE, config);
super(config);
this.client = client;
this.clusterService = clusterService;
this.cleanerService = cleanerService;
@ -302,26 +302,6 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
});
}
public static class Factory extends Exporter.Factory<LocalExporter> {
private final ClientProxy client;
private final ClusterService clusterService;
private final CleanerService cleanerService;
@Inject
public Factory(InternalClient client, ClusterService clusterService, CleanerService cleanerService) {
super(TYPE, true);
this.client = new ClientProxy(client);
this.clusterService = clusterService;
this.cleanerService = cleanerService;
}
@Override
public LocalExporter create(Config config) {
return new LocalExporter(config, client, clusterService, cleanerService);
}
}
enum State {
INITIALIZED,
RUNNING,

View File

@ -28,8 +28,6 @@ public class NodeStatsResolver extends MonitoringIndexNameResolver.Timestamped<N
"node_stats.node_id",
"node_stats.node_master",
"node_stats.mlockall",
"node_stats.disk_threshold_enabled",
"node_stats.disk_threshold_watermark_high",
// Node Stats
"node_stats.indices.docs.count",
"node_stats.indices.fielddata.memory_size_in_bytes",
@ -119,8 +117,6 @@ public class NodeStatsResolver extends MonitoringIndexNameResolver.Timestamped<N
builder.field(Fields.NODE_ID, document.getNodeId());
builder.field(Fields.NODE_MASTER, document.isNodeMaster());
builder.field(Fields.MLOCKALL, document.isMlockall());
builder.field(Fields.DISK_THRESHOLD_ENABLED, document.isDiskThresholdDeciderEnabled());
builder.field(Fields.DISK_THRESHOLD_WATERMARK_HIGH, document.getDiskThresholdWaterMarkHigh());
NodeStats nodeStats = document.getNodeStats();
if (nodeStats != null) {
@ -135,7 +131,5 @@ public class NodeStatsResolver extends MonitoringIndexNameResolver.Timestamped<N
static final String NODE_ID = "node_id";
static final String NODE_MASTER = "node_master";
static final String MLOCKALL = "mlockall";
static final String DISK_THRESHOLD_ENABLED = "disk_threshold_enabled";
static final String DISK_THRESHOLD_WATERMARK_HIGH = "disk_threshold_watermark_high";
}
}

View File

@ -5,8 +5,11 @@
*/
package org.elasticsearch.xpack.monitoring.cleaner;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -19,10 +22,6 @@ import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
/**
* {@code CleanerService} takes care of deleting old monitoring indices.
*/
@ -49,7 +48,6 @@ public class CleanerService extends AbstractLifecycleComponent {
clusterSettings.addSettingsUpdateConsumer(MonitoringSettings.HISTORY_DURATION, this::setGlobalRetention);
}
@Inject
public CleanerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, XPackLicenseState licenseState) {
this(settings, clusterSettings, licenseState, threadPool, new DefaultExecutionScheduler());
}

View File

@ -1,16 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.client;
import org.elasticsearch.common.inject.AbstractModule;
public class MonitoringClientModule extends AbstractModule {
@Override
protected void configure() {
bind(MonitoringClient.class).asEagerSingleton();
}
}

View File

@ -435,12 +435,6 @@
"mlockall": {
"type": "boolean"
},
"disk_threshold_enabled": {
"type": "boolean"
},
"disk_threshold_watermark_high": {
"type": "short"
},
"indices": {
"properties": {
"docs": {

View File

@ -128,10 +128,10 @@ public class MonitoringFeatureSetTests extends ESTestCase {
private Exporter mockExporter(String type, boolean enabled) {
Exporter exporter = mock(Exporter.class);
when(exporter.type()).thenReturn(type);
Exporter.Config enabledConfig = mock(Exporter.Config.class);
when(enabledConfig.enabled()).thenReturn(enabled);
when(exporter.config()).thenReturn(enabledConfig);
when(enabledConfig.type()).thenReturn(type);
return exporter;
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.is;
@ -16,10 +17,11 @@ public class MonitoringPluginClientTests extends ESTestCase {
public void testModulesWithClientSettings() {
Settings settings = Settings.builder()
.put("path.home", createTempDir())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE)
.build();
Monitoring plugin = new Monitoring(settings);
Monitoring plugin = new Monitoring(settings, new Environment(settings), null);
assertThat(plugin.isEnabled(), is(true));
assertThat(plugin.isTransportClient(), is(true));
}
@ -27,9 +29,10 @@ public class MonitoringPluginClientTests extends ESTestCase {
public void testModulesWithNodeSettings() {
// these settings mimic what ES does when running as a node...
Settings settings = Settings.builder()
.put("path.home", createTempDir())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), "node")
.build();
Monitoring plugin = new Monitoring(settings);
Monitoring plugin = new Monitoring(settings, new Environment(settings), null);
assertThat(plugin.isEnabled(), is(true));
assertThat(plugin.isTransportClient(), is(false));
}

View File

@ -104,7 +104,7 @@ public class MonitoringPluginTests extends MonitoringIntegTestCase {
fail("should have thrown an exception about missing implementation");
} catch (Exception ce) {
assertThat("message contains error about missing implementation: " + ce.getMessage(),
ce.getMessage().contains("No implementation"), equalTo(true));
ce.getMessage().contains("Could not find a suitable constructor"), equalTo(true));
}
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -42,6 +43,8 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -87,9 +90,12 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
public void setUp() throws Exception {
super.setUp();
CapturingTransport transport = new CapturingTransport();
Set<Setting<?>> clusterSettings = new HashSet<>();
clusterSettings.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterSettings.add(MonitoringSettings.EXPORTERS_SETTINGS);
clusterService = new ClusterService(Settings.builder().put("cluster.name",
TransportMonitoringBulkActionTests.class.getName()).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
new ClusterSettings(Settings.EMPTY, clusterSettings), threadPool);
clusterService.setLocalNode(new DiscoveryNode("node", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(),
Version.CURRENT));
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@ -257,8 +263,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
private final Collection<MonitoringDoc> exported = ConcurrentCollections.newConcurrentSet();
public CapturingExporters() {
super(Settings.EMPTY, Collections.emptyMap(), clusterService,
new ClusterSettings(Settings.EMPTY, Collections.singleton(MonitoringSettings.EXPORTERS_SETTINGS)));
super(Settings.EMPTY, Collections.emptyMap(), clusterService);
}
@Override
@ -279,8 +284,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
private final Consumer<Collection<? extends MonitoringDoc>> consumer;
public ConsumingExporters(Consumer<Collection<? extends MonitoringDoc>> consumer) {
super(Settings.EMPTY, Collections.emptyMap(), clusterService,
new ClusterSettings(Settings.EMPTY, Collections.singleton(MonitoringSettings.EXPORTERS_SETTINGS)));
super(Settings.EMPTY, Collections.emptyMap(), clusterService);
this.consumer = consumer;
}

View File

@ -56,8 +56,6 @@ public class NodeStatsCollectorTests extends AbstractCollectorTestCase {
equalTo(internalCluster().getInstance(ClusterService.class, node).localNode().getId()));
assertThat(nodeStatsMonitoringDoc.isNodeMaster(), equalTo(node.equals(internalCluster().getMasterName())));
assertThat(nodeStatsMonitoringDoc.isMlockall(), equalTo(BootstrapInfo.isMemoryLocked()));
assertNotNull(nodeStatsMonitoringDoc.isDiskThresholdDeciderEnabled());
assertNotNull(nodeStatsMonitoringDoc.getDiskThresholdWaterMarkHigh());
assertNotNull(nodeStatsMonitoringDoc.getNodeStats());
}
@ -68,8 +66,6 @@ public class NodeStatsCollectorTests extends AbstractCollectorTestCase {
internalCluster().getInstance(ClusterService.class, nodeId),
internalCluster().getInstance(MonitoringSettings.class, nodeId),
internalCluster().getInstance(XPackLicenseState.class, nodeId),
internalCluster().getInstance(InternalClient.class, nodeId),
internalCluster().getInstance(NodeEnvironment.class, nodeId),
internalCluster().getInstance(DiskThresholdDecider.class, nodeId));
internalCluster().getInstance(InternalClient.class, nodeId));
}
}

View File

@ -6,14 +6,18 @@
package org.elasticsearch.xpack.monitoring.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.collector.Collector;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.Map;
@ -114,8 +118,14 @@ public abstract class AbstractExporterTemplateTestCase extends MonitoringIntegTe
}
protected void doExporting() throws Exception {
Collector collector = internalCluster().getInstance(ClusterStatsCollector.class);
assertNotNull(collector);
// TODO: these should be unit tests, not using guice
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
XPackLicenseState licenseState = internalCluster().getInstance(XPackLicenseState.class);
LicenseService licenseService = internalCluster().getInstance(LicenseService.class);
InternalClient client = internalCluster().getInstance(InternalClient.class);
Collector collector = new ClusterStatsCollector(clusterService.getSettings(), clusterService,
new MonitoringSettings(clusterService.getSettings(), clusterService.getClusterSettings()),
licenseState, client, licenseService);
Exporters exporters = internalCluster().getInstance(Exporters.class);
assertNotNull(exporters);

View File

@ -21,6 +21,7 @@ import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.exporter.local.LocalExporter;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.security.InternalClient;
import org.junit.Before;
import java.util.ArrayList;
@ -64,21 +65,21 @@ public class ExportersTests extends ESTestCase {
public void init() throws Exception {
factories = new HashMap<>();
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
InternalClient client = mock(InternalClient.class);
clusterService = mock(ClusterService.class);
clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(MonitoringSettings.COLLECTORS,
MonitoringSettings.INTERVAL, MonitoringSettings.EXPORTERS_SETTINGS)));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
// we always need to have the local exporter as it serves as the default one
factories.put(LocalExporter.TYPE, new LocalExporter.Factory(ClientProxy.fromClient(client), clusterService,
factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService,
mock(CleanerService.class)));
clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(MonitoringSettings.COLLECTORS,
MonitoringSettings.INTERVAL, MonitoringSettings.EXPORTERS_SETTINGS)));
exporters = new Exporters(Settings.EMPTY, factories, clusterService, clusterSettings);
exporters = new Exporters(Settings.EMPTY, factories, clusterService);
}
public void testInitExportersDefault() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder().build());
assertThat(internalExporters, notNullValue());
@ -88,8 +89,7 @@ public class ExportersTests extends ESTestCase {
}
public void testInitExportersSingle() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder()
.put("_name.type", "_type")
.build());
@ -97,13 +97,12 @@ public class ExportersTests extends ESTestCase {
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.size(), is(1));
assertThat(internalExporters, hasKey("_name"));
assertThat(internalExporters.get("_name"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.get("_name").type, is("_type"));
assertThat(internalExporters.get("_name"), instanceOf(TestExporter.class));
assertThat(internalExporters.get("_name").config().type(), is("_type"));
}
public void testInitExportersSingleDisabled() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder()
.put("_name.type", "_type")
.put("_name.enabled", false)
@ -138,8 +137,7 @@ public class ExportersTests extends ESTestCase {
}
public void testInitExportersMultipleSameType() throws Exception {
Exporter.Factory factory = new TestFactory("_type", false);
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder()
.put("_name0.type", "_type")
.put("_name1.type", "_type")
@ -148,30 +146,26 @@ public class ExportersTests extends ESTestCase {
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.size(), is(2));
assertThat(internalExporters, hasKey("_name0"));
assertThat(internalExporters.get("_name0"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.get("_name0").type, is("_type"));
assertThat(internalExporters.get("_name0"), instanceOf(TestExporter.class));
assertThat(internalExporters.get("_name0").config().type(), is("_type"));
assertThat(internalExporters, hasKey("_name1"));
assertThat(internalExporters.get("_name1"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.get("_name1").type, is("_type"));
assertThat(internalExporters.get("_name1"), instanceOf(TestExporter.class));
assertThat(internalExporters.get("_name1").config().type(), is("_type"));
}
public void testInitExportersMultipleSameTypeSingletons() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
try {
factories.put("_type", TestSingletonExporter::new);
SettingsException e = expectThrows(SettingsException.class, () ->
exporters.initExporters(Settings.builder()
.put("_name0.type", "_type")
.put("_name1.type", "_type")
.build());
fail("Expected SettingsException");
} catch (SettingsException e) {
assertThat(e.getMessage(), containsString("multiple [_type] exporters are configured. there can only be one"));
}
.build())
);
assertThat(e.getMessage(), containsString("multiple [_type] exporters are configured. there can only be one"));
}
public void testSettingsUpdate() throws Exception {
Exporter.Factory factory = spy(new TestFactory("_type", false));
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
final AtomicReference<Settings> settingsHolder = new AtomicReference<>();
@ -180,8 +174,9 @@ public class ExportersTests extends ESTestCase {
.put("xpack.monitoring.collection.exporters._name1.type", "_type")
.build();
clusterSettings = new ClusterSettings(nodeSettings, new HashSet<>(Arrays.asList(MonitoringSettings.EXPORTERS_SETTINGS)));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
exporters = new Exporters(nodeSettings, factories, clusterService, clusterSettings) {
exporters = new Exporters(nodeSettings, factories, clusterService) {
@Override
Map<String, Exporter> initExporters(Settings settings) {
settingsHolder.set(settings);
@ -211,14 +206,14 @@ public class ExportersTests extends ESTestCase {
}
public void testOpenBulkOnMaster() throws Exception {
Exporter.Factory factory = new MockFactory("mock", false);
Exporter.Factory masterOnlyFactory = new MockFactory("mock_master_only", true);
Exporter.Factory factory = new MockFactory(false);
Exporter.Factory masterOnlyFactory = new MockFactory(true);
factories.put("mock", factory);
factories.put("mock_master_only", masterOnlyFactory);
Exporters exporters = new Exporters(Settings.builder()
.put("xpack.monitoring.collection.exporters._name0.type", "mock")
.put("xpack.monitoring.collection.exporters._name1.type", "mock_master_only")
.build(), factories, clusterService, clusterSettings);
.build(), factories, clusterService);
exporters.start();
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
@ -236,14 +231,14 @@ public class ExportersTests extends ESTestCase {
}
public void testExportNotOnMaster() throws Exception {
Exporter.Factory factory = new MockFactory("mock", false);
Exporter.Factory masterOnlyFactory = new MockFactory("mock_master_only", true);
Exporter.Factory factory = new MockFactory(false);
Exporter.Factory masterOnlyFactory = new MockFactory(true);
factories.put("mock", factory);
factories.put("mock_master_only", masterOnlyFactory);
Exporters exporters = new Exporters(Settings.builder()
.put("xpack.monitoring.collection.exporters._name0.type", "mock")
.put("xpack.monitoring.collection.exporters._name1.type", "mock_master_only")
.build(), factories, clusterService, clusterSettings);
.build(), factories, clusterService);
exporters.start();
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
@ -257,6 +252,7 @@ public class ExportersTests extends ESTestCase {
verify(exporters.getExporter("_name0"), times(1)).masterOnly();
verify(exporters.getExporter("_name0"), times(1)).openBulk();
verify(exporters.getExporter("_name1"), times(1)).masterOnly();
verify(exporters.getExporter("_name1"), times(1)).isSingleton();
verifyNoMoreInteractions(exporters.getExporter("_name1"));
}
@ -273,10 +269,9 @@ public class ExportersTests extends ESTestCase {
settings.put("xpack.monitoring.collection.exporters._name" + String.valueOf(i) + ".type", "record");
}
Exporter.Factory factory = new CountingExportFactory("record", false);
factories.put("record", factory);
factories.put("record", CountingExporter::new);
Exporters exporters = new Exporters(settings.build(), factories, clusterService, clusterSettings);
Exporters exporters = new Exporters(settings.build(), factories, clusterService);
exporters.start();
final Thread[] threads = new Thread[3 + randomInt(7)];
@ -327,51 +322,50 @@ public class ExportersTests extends ESTestCase {
assertThat(exceptions, empty());
for (Exporter exporter : exporters) {
assertThat(exporter, instanceOf(CountingExportFactory.CountingExporter.class));
assertThat(((CountingExportFactory.CountingExporter) exporter).getExportedCount(), equalTo(total));
assertThat(exporter, instanceOf(CountingExporter.class));
assertThat(((CountingExporter) exporter).getExportedCount(), equalTo(total));
}
exporters.close();
}
static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> {
public TestFactory(String type, boolean singleton) {
super(type, singleton);
static class TestExporter extends Exporter {
public TestExporter(Config config) {
super(config);
}
@Override
public TestExporter create(Exporter.Config config) {
return new TestExporter(type(), config);
public ExportBulk openBulk() {
return mock(ExportBulk.class);
}
static class TestExporter extends Exporter {
public TestExporter(String type, Config config) {
super(type, config);
}
@Override
public ExportBulk openBulk() {
return mock(ExportBulk.class);
}
@Override
public void doClose() {
}
@Override
public void doClose() {
}
}
static class MockFactory extends Exporter.Factory<Exporter> {
static class TestSingletonExporter extends TestExporter {
TestSingletonExporter(Config config) {
super(config);
}
@Override
public boolean isSingleton() {
return true;
}
}
static class MockFactory implements Exporter.Factory {
private final boolean masterOnly;
public MockFactory(String type, boolean masterOnly) {
super(type, false);
public MockFactory(boolean masterOnly) {
this.masterOnly = masterOnly;
}
@Override
public Exporter create(Exporter.Config config) {
Exporter exporter = mock(Exporter.class);
when(exporter.type()).thenReturn(type());
when(exporter.name()).thenReturn(config.name());
when(exporter.masterOnly()).thenReturn(masterOnly);
when(exporter.openBulk()).thenReturn(mock(ExportBulk.class));
@ -379,73 +373,58 @@ public class ExportersTests extends ESTestCase {
}
}
/**
* A factory of exporters that count the number of exported documents.
*/
static class CountingExportFactory extends Exporter.Factory<CountingExportFactory.CountingExporter> {
static class CountingExporter extends Exporter {
public CountingExportFactory(String type, boolean singleton) {
super(type, singleton);
private static final AtomicInteger count = new AtomicInteger(0);
private List<CountingBulk> bulks = new CopyOnWriteArrayList<>();
public CountingExporter(Config config) {
super(config);
}
@Override
public CountingExporter create(Exporter.Config config) {
return new CountingExporter(type(), config);
public ExportBulk openBulk() {
CountingBulk bulk = new CountingBulk(config.type() + "#" + count.getAndIncrement());
bulks.add(bulk);
return bulk;
}
static class CountingExporter extends Exporter {
private static final AtomicInteger count = new AtomicInteger(0);
private List<CountingBulk> bulks = new CopyOnWriteArrayList<>();
public CountingExporter(String type, Config config) {
super(type, config);
}
@Override
public ExportBulk openBulk() {
CountingBulk bulk = new CountingBulk(type + "#" + count.getAndIncrement());
bulks.add(bulk);
return bulk;
}
@Override
public void doClose() {
}
public int getExportedCount() {
int exported = 0;
for (CountingBulk bulk : bulks) {
exported += bulk.getCount();
}
return exported;
}
@Override
public void doClose() {
}
static class CountingBulk extends ExportBulk {
private final AtomicInteger count = new AtomicInteger();
public CountingBulk(String name) {
super(name);
public int getExportedCount() {
int exported = 0;
for (CountingBulk bulk : bulks) {
exported += bulk.getCount();
}
return exported;
}
}
@Override
protected void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
count.addAndGet(docs.size());
}
static class CountingBulk extends ExportBulk {
@Override
protected void doFlush() {
}
private final AtomicInteger count = new AtomicInteger();
@Override
protected void doClose() throws ExportException {
}
public CountingBulk(String name) {
super(name);
}
int getCount() {
return count.get();
}
@Override
protected void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
count.addAndGet(docs.size());
}
@Override
protected void doFlush() {
}
@Override
protected void doClose() throws ExportException {
}
int getCount() {
return count.get();
}
}
}

View File

@ -63,8 +63,6 @@ public class NodeStatsResolverTests extends MonitoringIndexNameResolverTestCase<
doc.setSourceNode(new DiscoveryNode("id", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT));
doc.setNodeMaster(randomBoolean());
doc.setNodeId(UUID.randomUUID().toString());
doc.setDiskThresholdDeciderEnabled(randomBoolean());
doc.setDiskThresholdWaterMarkHigh(randomDouble());
doc.setMlockall(randomBoolean());
doc.setNodeStats(randomNodeStats());
return doc;

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.Licensing;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
@ -139,7 +140,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
this.licensing = new Licensing(settings);
this.security = new Security(settings, env, licenseState);
this.monitoring = new Monitoring(settings);
this.monitoring = new Monitoring(settings, env, licenseState);
this.watcher = new Watcher(settings);
this.graph = new Graph(settings);
this.notification = new Notification(settings);
@ -173,8 +174,6 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
if (transportClientMode == false) {
modules.add(new TextTemplateModule());
// Note: this only exists so LicenseService subclasses can be bound in mock tests
modules.addAll(licensing.nodeModules());
} else {
modules.add(b -> b.bind(XPackLicenseState.class).toProvider(Providers.of(null)));
}
@ -185,7 +184,6 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
ArrayList<Class<? extends LifecycleComponent>> services = new ArrayList<>();
services.addAll(notification.nodeServices());
services.addAll(monitoring.nodeServices());
return services;
}
@ -196,9 +194,14 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
final InternalClient internalClient = new InternalClient(settings, threadPool, client, security.getCryptoService());
components.add(internalClient);
components.addAll(licensing.createComponents(clusterService, getClock(), env, resourceWatcherService, licenseState));
LicenseService licenseService = new LicenseService(settings, clusterService, getClock(),
env, resourceWatcherService, licenseState);
components.add(licenseService);
components.add(licenseState);
components.addAll(security.createComponents(internalClient, threadPool, clusterService, resourceWatcherService,
extensionsService.getExtensions()));
components.addAll(monitoring.createComponents(internalClient, threadPool, clusterService, licenseService));
// watcher http stuff
Map<String, HttpAuthFactory> httpAuthFactories = new HashMap<>();

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -18,6 +19,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
@ -192,8 +194,10 @@ public class IndexActionTests extends ESIntegTestCase {
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout);
}
builder.endObject();
Client client = client();
InternalClient internalClient = new InternalClient(client.settings(), client.threadPool(), client, null);
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.fromClient(client()));
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, internalClient);
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
@ -221,7 +225,10 @@ public class IndexActionTests extends ESIntegTestCase {
}
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY,ClientProxy.fromClient(client()));
Client client = client();
InternalClient internalClient = new InternalClient(client.settings(), client.threadPool(), client, null);
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, internalClient);
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
try {