Internal: Remove guice from monitoring

This change removes guice from construction of monitoring classes.
Additionally, it removes disk threshold watermark and enabled setting
from node stats collector. These were not node stats, just cluster
settings. If we want to add back actual percentage of disk threshold
used, it should be in node stats directly.

Original commit: elastic/x-pack-elasticsearch@4cd49557cf
This commit is contained in:
Ryan Ernst 2016-07-26 11:25:22 -07:00
parent 4d063eddbd
commit b02b30ee0a
36 changed files with 282 additions and 517 deletions

View File

@ -57,10 +57,6 @@ public class Licensing implements ActionPlugin {
isTribeNode = isTribeNode(settings);
}
public Collection<Module> nodeModules() {
return Collections.emptyList();
}
@Override
public List<ActionHandler<? extends ActionRequest<?>, ? extends ActionResponse>> getActions() {
if (isTribeNode) {
@ -81,14 +77,6 @@ public class Licensing implements ActionPlugin {
RestDeleteLicenseAction.class);
}
public Collection<Object> createComponents(ClusterService clusterService, Clock clock, Environment environment,
ResourceWatcherService resourceWatcherService,
XPackLicenseState licenseState) {
LicenseService licenseService = new LicenseService(settings, clusterService, clock,
environment, resourceWatcherService, licenseState);
return Arrays.asList(licenseService, licenseState);
}
public List<Setting<?>> getSettings() {
// TODO convert this wildcard to a real setting
return Collections.singletonList(Setting.groupSetting("license.", Setting.Property.NodeScope));

View File

@ -7,28 +7,46 @@ package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.license.plugin.core.LicenseService;
import org.elasticsearch.license.plugin.core.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.monitoring.action.MonitoringBulkAction;
import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction;
import org.elasticsearch.xpack.monitoring.agent.AgentService;
import org.elasticsearch.xpack.monitoring.agent.collector.CollectorModule;
import org.elasticsearch.xpack.monitoring.agent.exporter.ExporterModule;
import org.elasticsearch.xpack.monitoring.agent.collector.Collector;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndicesStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.node.NodeStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.shards.ShardsCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporters;
import org.elasticsearch.xpack.monitoring.agent.exporter.http.HttpExporter;
import org.elasticsearch.xpack.monitoring.agent.exporter.local.LocalExporter;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.client.MonitoringClientModule;
import org.elasticsearch.xpack.monitoring.rest.action.RestMonitoringBulkAction;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@ -44,12 +62,16 @@ public class Monitoring implements ActionPlugin {
public static final String NAME = "monitoring";
private final Settings settings;
private final Environment env;
private final XPackLicenseState licenseState;
private final boolean enabled;
private final boolean transportClientMode;
private final boolean tribeNode;
public Monitoring(Settings settings) {
public Monitoring(Settings settings, Environment env, XPackLicenseState licenseState) {
this.settings = settings;
this.env = env;
this.licenseState = licenseState;
this.enabled = enabled(settings);
this.transportClientMode = XPackPlugin.transportClientMode(settings);
this.tribeNode = XPackPlugin.isTribeNode(settings);
@ -65,20 +87,41 @@ public class Monitoring implements ActionPlugin {
public Collection<Module> nodeModules() {
List<Module> modules = new ArrayList<>();
modules.add(new MonitoringModule(enabled, transportClientMode));
modules.add(new ExporterModule(settings));
if (enabled && transportClientMode == false && tribeNode == false) {
modules.add(new CollectorModule());
modules.add(new MonitoringClientModule());
}
modules.add(b -> {
XPackPlugin.bindFeatureSet(b, MonitoringFeatureSet.class);
if (transportClientMode || enabled == false || tribeNode) {
b.bind(Exporters.class).toProvider(Providers.of(null));
}
});
return modules;
}
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (enabled == false || transportClientMode || tribeNode) {
public Collection<Object> createComponents(InternalClient client, ThreadPool threadPool, ClusterService clusterService,
LicenseService licenseService) {
if (enabled == false || tribeNode) {
return Collections.emptyList();
}
return Arrays.<Class<? extends LifecycleComponent>>asList(AgentService.class, CleanerService.class);
final ClusterSettings clusterSettings = clusterService.getClusterSettings();
final MonitoringSettings monitoringSettings = new MonitoringSettings(settings, clusterSettings);
final CleanerService cleanerService = new CleanerService(settings, clusterSettings, threadPool, licenseState);
Map<String, Exporter.Factory> exporterFactories = new HashMap<>();
exporterFactories.put(HttpExporter.TYPE, config -> new HttpExporter(config, env));
exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService, cleanerService));
final Exporters exporters = new Exporters(settings, exporterFactories, clusterService);
Set<Collector> collectors = new HashSet<>();
collectors.add(new IndicesStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new IndexStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new ClusterStatsCollector(settings, clusterService, monitoringSettings, licenseState, client, licenseService));
collectors.add(new ClusterStateCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new ShardsCollector(settings, clusterService, monitoringSettings, licenseState));
collectors.add(new NodeStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new IndexRecoveryCollector(settings, clusterService, monitoringSettings, licenseState, client));
final AgentService agentService = new AgentService(settings, clusterSettings, collectors, exporters);
return Arrays.asList(agentService, monitoringSettings, exporters, cleanerService);
}
@Override

View File

@ -71,7 +71,7 @@ public class MonitoringFeatureSet implements XPackFeatureSet {
Map<String, Object> usage = new HashMap<>();
for (Exporter exporter : exporters) {
if (exporter.config().enabled()) {
String type = exporter.type();
String type = exporter.config().type();
int count = (Integer) usage.getOrDefault(type, 0);
usage.put(type, count + 1);
}

View File

@ -1,34 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.monitoring.agent.AgentService;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
public class MonitoringModule extends AbstractModule {
private final boolean enabled;
private final boolean transportClientMode;
public MonitoringModule(boolean enabled, boolean transportClientMode) {
this.enabled = enabled;
this.transportClientMode = transportClientMode;
}
@Override
protected void configure() {
XPackPlugin.bindFeatureSet(binder(), MonitoringFeatureSet.class);
if (enabled && transportClientMode == false) {
bind(MonitoringSettings.class).asEagerSingleton();
bind(AgentService.class).asEagerSingleton();
bind(CleanerService.class).asEagerSingleton();
}
}
}

View File

@ -34,7 +34,6 @@ public class MonitoringSettings extends AbstractComponent {
* The minimum amount of time allowed for the history duration.
*/
public static final TimeValue HISTORY_DURATION_MINIMUM = TimeValue.timeValueHours(24);
public static final TimeValue MAX_LICENSE_GRACE_PERIOD = TimeValue.timeValueHours(7 * 24);
/**
* Determines whether monitoring is enabled/disabled
@ -153,7 +152,6 @@ public class MonitoringSettings extends AbstractComponent {
private volatile boolean recoveryActiveOnly;
private volatile String[] indices;
@Inject
public MonitoringSettings(Settings settings, ClusterSettings clusterSettings) {
super(settings);

View File

@ -51,7 +51,6 @@ public class AgentService extends AbstractLifecycleComponent {
private final String[] settingsCollectors;
private final Exporters exporters;
@Inject
public AgentService(Settings settings, ClusterSettings clusterSettings, Set<Collector> collectors, Exporters exporters) {
super(settings);
this.samplingIntervalMillis = MonitoringSettings.INTERVAL.get(settings).millis();

View File

@ -27,7 +27,6 @@ public abstract class AbstractCollector extends AbstractLifecycleComponent imple
protected final MonitoringSettings monitoringSettings;
protected final XPackLicenseState licenseState;
@Inject
public AbstractCollector(Settings settings, String name, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState) {
super(settings);

View File

@ -1,48 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.agent.collector;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndicesStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.node.NodeStatsCollector;
import org.elasticsearch.xpack.monitoring.agent.collector.shards.ShardsCollector;
import java.util.HashSet;
import java.util.Set;
public class CollectorModule extends AbstractModule {
private final Set<Class<? extends Collector>> collectors = new HashSet<>();
public CollectorModule() {
// Registers default collectors
registerCollector(IndicesStatsCollector.class);
registerCollector(IndexStatsCollector.class);
registerCollector(ClusterStatsCollector.class);
registerCollector(ClusterStateCollector.class);
registerCollector(ShardsCollector.class);
registerCollector(NodeStatsCollector.class);
registerCollector(IndexRecoveryCollector.class);
}
@Override
protected void configure() {
Multibinder<Collector> binder = Multibinder.newSetBinder(binder(), Collector.class);
for (Class<? extends Collector> collector : collectors) {
bind(collector).asEagerSingleton();
binder.addBinding().to(collector);
}
}
public void registerCollector(Class<? extends Collector> collector) {
collectors.add(collector);
}
}

View File

@ -5,13 +5,17 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.cluster;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.plugin.core.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
@ -19,11 +23,6 @@ import org.elasticsearch.xpack.monitoring.agent.collector.AbstractCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for cluster state.
* <p>
@ -36,7 +35,6 @@ public class ClusterStateCollector extends AbstractCollector {
private final Client client;
@Inject
public ClusterStateCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -5,13 +5,17 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.cluster;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.plugin.core.LicenseService;
import org.elasticsearch.license.plugin.core.LicenseUtils;
@ -21,11 +25,6 @@ import org.elasticsearch.xpack.monitoring.agent.collector.AbstractCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for cluster stats.
* <p>
@ -43,7 +42,6 @@ public class ClusterStatsCollector extends AbstractCollector {
private final LicenseService licenseService;
private final Client client;
@Inject
public ClusterStatsCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client,
LicenseService licenseService) {

View File

@ -5,12 +5,17 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.indices;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.plugin.core.XPackLicenseState;
@ -20,12 +25,6 @@ import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for the Recovery API.
* <p>
@ -38,7 +37,6 @@ public class IndexRecoveryCollector extends AbstractCollector {
private final Client client;
@Inject
public IndexRecoveryCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -5,6 +5,12 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.indices;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.IndicesOptions;
@ -12,7 +18,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.plugin.core.XPackLicenseState;
@ -22,12 +27,6 @@ import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for indices statistics.
* <p>
@ -40,7 +39,6 @@ public class IndexStatsCollector extends AbstractCollector {
private final Client client;
@Inject
public IndexStatsCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -5,12 +5,15 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.indices;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.plugin.core.XPackLicenseState;
@ -20,10 +23,6 @@ import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.Security;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
/**
* Collector for indices statistics.
* <p>
@ -35,7 +34,6 @@ public class IndicesStatsCollector extends AbstractCollector {
private final Client client;
@Inject
public IndicesStatsCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.node;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
@ -14,18 +18,16 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.common.unit.RatioValue;
import org.elasticsearch.license.plugin.core.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.collector.AbstractCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.Collection;
import java.util.Collections;
/**
* Collector for nodes statistics.
* <p>
@ -37,29 +39,11 @@ public class NodeStatsCollector extends AbstractCollector {
public static final String NAME = "node-stats-collector";
private final Client client;
private final NodeEnvironment nodeEnvironment;
private final DiskThresholdDecider diskThresholdDecider;
@Inject
public NodeStatsCollector(Settings settings, ClusterService clusterService, MonitoringSettings monitoringSettings,
XPackLicenseState licenseState, InternalClient client,
NodeEnvironment nodeEnvironment, DiskThresholdDecider diskThresholdDecider) {
XPackLicenseState licenseState, InternalClient client) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);
this.client = client;
this.nodeEnvironment = nodeEnvironment;
this.diskThresholdDecider = diskThresholdDecider;
}
@Override
protected boolean shouldCollect() {
// In some cases, the collector starts to collect nodes stats but the
// NodeEnvironment is not fully initialized (NodePath is null) and can fail.
// This why we need to check for nodeEnvironment.hasNodeFile() here, but only
// for nodes that can hold data. Client nodes can collect nodes stats because
// elasticsearch correctly handles the nodes stats for client nodes.
return super.shouldCollect()
&& (DiscoveryNode.nodeRequiresLocalStorage(settings) == false || nodeEnvironment.hasNodeFile());
}
@Override
@ -80,12 +64,6 @@ public class NodeStatsCollector extends AbstractCollector {
}
NodeStats nodeStats = response.getNodes().get(0);
// Here we are calling directly the DiskThresholdDecider to retrieve the high watermark value
// It would be nicer to use a settings API like documented in #6732
Double diskThresholdWatermarkHigh = (diskThresholdDecider != null) ? 100.0 - diskThresholdDecider.getFreeDiskThresholdHigh() : -1;
boolean diskThresholdDeciderEnabled = (diskThresholdDecider != null) && diskThresholdDecider.isEnabled();
DiscoveryNode sourceNode = localNode();
NodeStatsMonitoringDoc nodeStatsDoc = new NodeStatsMonitoringDoc(monitoringId(), monitoringVersion());
@ -96,8 +74,6 @@ public class NodeStatsCollector extends AbstractCollector {
nodeStatsDoc.setNodeMaster(isLocalNodeMaster());
nodeStatsDoc.setNodeStats(nodeStats);
nodeStatsDoc.setMlockall(BootstrapInfo.isMemoryLocked());
nodeStatsDoc.setDiskThresholdWaterMarkHigh(diskThresholdWatermarkHigh);
nodeStatsDoc.setDiskThresholdDeciderEnabled(diskThresholdDeciderEnabled);
return Collections.singletonList(nodeStatsDoc);
}

View File

@ -13,10 +13,7 @@ public class NodeStatsMonitoringDoc extends MonitoringDoc {
private String nodeId;
private boolean nodeMaster;
private NodeStats nodeStats;
private boolean mlockall;
private Double diskThresholdWaterMarkHigh;
private boolean diskThresholdDeciderEnabled;
public NodeStatsMonitoringDoc(String monitoringId, String monitoringVersion) {
super(monitoringId, monitoringVersion);
@ -38,14 +35,6 @@ public class NodeStatsMonitoringDoc extends MonitoringDoc {
this.mlockall = mlockall;
}
public void setDiskThresholdWaterMarkHigh(Double diskThresholdWaterMarkHigh) {
this.diskThresholdWaterMarkHigh = diskThresholdWaterMarkHigh;
}
public void setDiskThresholdDeciderEnabled(boolean diskThresholdDeciderEnabled) {
this.diskThresholdDeciderEnabled = diskThresholdDeciderEnabled;
}
public String getNodeId() {
return nodeId;
}
@ -61,13 +50,5 @@ public class NodeStatsMonitoringDoc extends MonitoringDoc {
public boolean isMlockall() {
return mlockall;
}
public Double getDiskThresholdWaterMarkHigh() {
return diskThresholdWaterMarkHigh;
}
public boolean isDiskThresholdDeciderEnabled() {
return diskThresholdDeciderEnabled;
}
}

View File

@ -5,12 +5,17 @@
*/
package org.elasticsearch.xpack.monitoring.agent.collector.shards;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.plugin.core.XPackLicenseState;
@ -18,12 +23,6 @@ import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.collector.AbstractCollector;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Collector for shards.
* <p>
@ -34,7 +33,6 @@ public class ShardsCollector extends AbstractCollector {
public static final String NAME = "shards-collector";
@Inject
public ShardsCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings, XPackLicenseState licenseState) {
super(settings, NAME, clusterService, monitoringSettings, licenseState);

View File

@ -19,7 +19,6 @@ public abstract class Exporter implements AutoCloseable {
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
protected final String type;
protected final Config config;
protected final ESLogger logger;
@ -27,17 +26,12 @@ public abstract class Exporter implements AutoCloseable {
private AtomicBoolean closed = new AtomicBoolean(false);
public Exporter(String type, Config config) {
this.type = type;
public Exporter(Config config) {
this.config = config;
this.logger = config.logger(getClass());
this.bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null);
}
public String type() {
return type;
}
public String name() {
return config.name;
}
@ -50,6 +44,11 @@ public abstract class Exporter implements AutoCloseable {
return false;
}
/** Returns true if only one instance of this exporter should be allowed. */
public boolean isSingleton() {
return false;
}
/**
* Opens up a new export bulk. May return {@code null} indicating this exporter is not ready
* yet to export the docs
@ -76,12 +75,14 @@ public abstract class Exporter implements AutoCloseable {
public static class Config {
private final String name;
private final String type;
private final boolean enabled;
private final Settings globalSettings;
private final Settings settings;
public Config(String name, Settings globalSettings, Settings settings) {
public Config(String name, String type, Settings globalSettings, Settings settings) {
this.name = name;
this.type = type;
this.globalSettings = globalSettings;
this.settings = settings;
this.enabled = settings.getAsBoolean("enabled", true);
@ -91,6 +92,10 @@ public abstract class Exporter implements AutoCloseable {
return name;
}
public String type() {
return type;
}
public boolean enabled() {
return enabled;
}
@ -104,24 +109,10 @@ public abstract class Exporter implements AutoCloseable {
}
}
public abstract static class Factory<E extends Exporter> {
/** A factory for constructing {@link Exporter} instances.*/
public interface Factory {
private final String type;
private final boolean singleton;
public Factory(String type, boolean singleton) {
this.type = type;
this.singleton = singleton;
}
public String type() {
return type;
}
public boolean singleton() {
return singleton;
}
public abstract E create(Config config);
/** Create an exporter with the given configuration. */
Exporter create(Config config);
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.agent.exporter;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.monitoring.Monitoring;
import org.elasticsearch.xpack.monitoring.agent.exporter.http.HttpExporter;
import org.elasticsearch.xpack.monitoring.agent.exporter.local.LocalExporter;
import java.util.HashMap;
import java.util.Map;
public class ExporterModule extends AbstractModule {
private final Settings settings;
private final Map<String, Class<? extends Exporter.Factory<? extends Exporter>>> exporterFactories = new HashMap<>();
public ExporterModule(Settings settings) {
this.settings = settings;
registerExporter(HttpExporter.TYPE, HttpExporter.Factory.class);
registerExporter(LocalExporter.TYPE, LocalExporter.Factory.class);
}
@Override
protected void configure() {
if (Monitoring.enabled(settings) && XPackPlugin.transportClientMode(settings) == false
&& XPackPlugin.isTribeNode(settings) == false) {
bind(Exporters.class).asEagerSingleton();
MapBinder<String, Exporter.Factory> factoryBinder = MapBinder.newMapBinder(binder(), String.class, Exporter.Factory.class);
for (Map.Entry<String, Class<? extends Exporter.Factory<? extends Exporter>>> entry : exporterFactories.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
factoryBinder.addBinding(entry.getKey()).to(entry.getValue());
}
} else {
bind(Exporters.class).toProvider(Providers.of(null));
}
}
public void registerExporter(String type, Class<? extends Exporter.Factory<? extends Exporter>> factory) {
exporterFactories.put(type, factory);
}
}

View File

@ -8,9 +8,7 @@ package org.elasticsearch.xpack.monitoring.agent.exporter;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.node.Node;
@ -39,16 +37,15 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
private final AtomicReference<Map<String, Exporter>> exporters;
@Inject
public Exporters(Settings settings, Map<String, Exporter.Factory> factories,
ClusterService clusterService,
ClusterSettings clusterSettings) {
ClusterService clusterService) {
super(settings);
this.factories = factories;
this.clusterService = clusterService;
this.exporters = new AtomicReference<>(emptyMap());
clusterSettings.addSettingsUpdateConsumer(MonitoringSettings.EXPORTERS_SETTINGS, this::setExportersSetting);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MonitoringSettings.EXPORTERS_SETTINGS,
this::setExportersSetting);
}
private void setExportersSetting(Settings exportersSetting) {
@ -135,7 +132,7 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
if (factory == null) {
throw new SettingsException("unknown exporter type [" + type + "] set for exporter [" + name + "]");
}
Exporter.Config config = new Exporter.Config(name, globalSettings, exporterSettings);
Exporter.Config config = new Exporter.Config(name, type, globalSettings, exporterSettings);
if (!config.enabled()) {
hasDisabled = true;
if (logger.isDebugEnabled()) {
@ -143,8 +140,9 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
}
continue;
}
if (factory.singleton()) {
// this is a singleton exporter factory, let's make sure we didn't already registered one
Exporter exporter = factory.create(config);
if (exporter.isSingleton()) {
// this is a singleton exporter, let's make sure we didn't already create one
// (there can only be one instance of a singleton exporter)
if (singletons.contains(type)) {
throw new SettingsException("multiple [" + type + "] exporters are configured. there can " +
@ -152,7 +150,7 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
}
singletons.add(type);
}
exporters.put(config.name(), factory.create(config));
exporters.put(config.name(), exporter);
}
// no exporters are configured, lets create a default local one.
@ -161,7 +159,8 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
// fallback on the default
//
if (exporters.isEmpty() && !hasDisabled) {
Exporter.Config config = new Exporter.Config("default_" + LocalExporter.TYPE, globalSettings, Settings.EMPTY);
Exporter.Config config = new Exporter.Config("default_" + LocalExporter.TYPE, LocalExporter.TYPE,
globalSettings, Settings.EMPTY);
exporters.put(config.name(), factories.get(LocalExporter.TYPE).create(config));
}

View File

@ -111,9 +111,8 @@ public class HttpExporter extends Exporter {
final ConnectionKeepAliveWorker keepAliveWorker;
Thread keepAliveThread;
public HttpExporter(Exporter.Config config, Environment env) {
super(TYPE, config);
public HttpExporter(Config config, Environment env) {
super(config);
this.env = env;
hosts = config.settings().getAsArray(HOST_SETTING, Strings.EMPTY_ARRAY);
@ -754,20 +753,4 @@ public class HttpExporter extends Exporter {
}
}
}
public static class Factory extends Exporter.Factory<HttpExporter> {
private final Environment env;
@Inject
public Factory(Environment env) {
super(TYPE, false);
this.env = env;
}
@Override
public HttpExporter create(Config config) {
return new HttpExporter(config, env);
}
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.monitoring.agent.exporter.ExportException;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.agent.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.agent.resolver.ResolversRegistry;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.Arrays;
import java.util.Collection;
@ -28,13 +29,13 @@ import java.util.Collection;
public class LocalBulk extends ExportBulk {
private final ESLogger logger;
private final ClientProxy client;
private final InternalClient client;
private final ResolversRegistry resolvers;
private BulkRequestBuilder requestBuilder;
public LocalBulk(String name, ESLogger logger, ClientProxy client, ResolversRegistry resolvers) {
public LocalBulk(String name, ESLogger logger, InternalClient client, ResolversRegistry resolvers) {
super(name);
this.logger = logger;
this.client = client;

View File

@ -51,16 +51,16 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
public static final String TYPE = "local";
private final ClientProxy client;
private final InternalClient client;
private final ClusterService clusterService;
private final ResolversRegistry resolvers;
private final CleanerService cleanerService;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
public LocalExporter(Exporter.Config config, ClientProxy client,
public LocalExporter(Exporter.Config config, InternalClient client,
ClusterService clusterService, CleanerService cleanerService) {
super(TYPE, config);
super(config);
this.client = client;
this.clusterService = clusterService;
this.cleanerService = cleanerService;
@ -302,26 +302,6 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
});
}
public static class Factory extends Exporter.Factory<LocalExporter> {
private final ClientProxy client;
private final ClusterService clusterService;
private final CleanerService cleanerService;
@Inject
public Factory(InternalClient client, ClusterService clusterService, CleanerService cleanerService) {
super(TYPE, true);
this.client = new ClientProxy(client);
this.clusterService = clusterService;
this.cleanerService = cleanerService;
}
@Override
public LocalExporter create(Config config) {
return new LocalExporter(config, client, clusterService, cleanerService);
}
}
enum State {
INITIALIZED,
RUNNING,

View File

@ -28,8 +28,6 @@ public class NodeStatsResolver extends MonitoringIndexNameResolver.Timestamped<N
"node_stats.node_id",
"node_stats.node_master",
"node_stats.mlockall",
"node_stats.disk_threshold_enabled",
"node_stats.disk_threshold_watermark_high",
// Node Stats
"node_stats.indices.docs.count",
"node_stats.indices.fielddata.memory_size_in_bytes",
@ -119,8 +117,6 @@ public class NodeStatsResolver extends MonitoringIndexNameResolver.Timestamped<N
builder.field(Fields.NODE_ID, document.getNodeId());
builder.field(Fields.NODE_MASTER, document.isNodeMaster());
builder.field(Fields.MLOCKALL, document.isMlockall());
builder.field(Fields.DISK_THRESHOLD_ENABLED, document.isDiskThresholdDeciderEnabled());
builder.field(Fields.DISK_THRESHOLD_WATERMARK_HIGH, document.getDiskThresholdWaterMarkHigh());
NodeStats nodeStats = document.getNodeStats();
if (nodeStats != null) {
@ -135,7 +131,5 @@ public class NodeStatsResolver extends MonitoringIndexNameResolver.Timestamped<N
static final String NODE_ID = "node_id";
static final String NODE_MASTER = "node_master";
static final String MLOCKALL = "mlockall";
static final String DISK_THRESHOLD_ENABLED = "disk_threshold_enabled";
static final String DISK_THRESHOLD_WATERMARK_HIGH = "disk_threshold_watermark_high";
}
}

View File

@ -5,8 +5,11 @@
*/
package org.elasticsearch.xpack.monitoring.cleaner;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -19,10 +22,6 @@ import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
/**
* {@code CleanerService} takes care of deleting old monitoring indices.
*/
@ -49,7 +48,6 @@ public class CleanerService extends AbstractLifecycleComponent {
clusterSettings.addSettingsUpdateConsumer(MonitoringSettings.HISTORY_DURATION, this::setGlobalRetention);
}
@Inject
public CleanerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, XPackLicenseState licenseState) {
this(settings, clusterSettings, licenseState, threadPool, new DefaultExecutionScheduler());
}

View File

@ -1,16 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.client;
import org.elasticsearch.common.inject.AbstractModule;
public class MonitoringClientModule extends AbstractModule {
@Override
protected void configure() {
bind(MonitoringClient.class).asEagerSingleton();
}
}

View File

@ -435,12 +435,6 @@
"mlockall": {
"type": "boolean"
},
"disk_threshold_enabled": {
"type": "boolean"
},
"disk_threshold_watermark_high": {
"type": "short"
},
"indices": {
"properties": {
"docs": {

View File

@ -128,10 +128,10 @@ public class MonitoringFeatureSetTests extends ESTestCase {
private Exporter mockExporter(String type, boolean enabled) {
Exporter exporter = mock(Exporter.class);
when(exporter.type()).thenReturn(type);
Exporter.Config enabledConfig = mock(Exporter.Config.class);
when(enabledConfig.enabled()).thenReturn(enabled);
when(exporter.config()).thenReturn(enabledConfig);
when(enabledConfig.type()).thenReturn(type);
return exporter;
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.is;
@ -16,10 +17,11 @@ public class MonitoringPluginClientTests extends ESTestCase {
public void testModulesWithClientSettings() {
Settings settings = Settings.builder()
.put("path.home", createTempDir())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE)
.build();
Monitoring plugin = new Monitoring(settings);
Monitoring plugin = new Monitoring(settings, new Environment(settings), null);
assertThat(plugin.isEnabled(), is(true));
assertThat(plugin.isTransportClient(), is(true));
}
@ -27,9 +29,10 @@ public class MonitoringPluginClientTests extends ESTestCase {
public void testModulesWithNodeSettings() {
// these settings mimic what ES does when running as a node...
Settings settings = Settings.builder()
.put("path.home", createTempDir())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), "node")
.build();
Monitoring plugin = new Monitoring(settings);
Monitoring plugin = new Monitoring(settings, new Environment(settings), null);
assertThat(plugin.isEnabled(), is(true));
assertThat(plugin.isTransportClient(), is(false));
}

View File

@ -104,7 +104,7 @@ public class MonitoringPluginTests extends MonitoringIntegTestCase {
fail("should have thrown an exception about missing implementation");
} catch (Exception ce) {
assertThat("message contains error about missing implementation: " + ce.getMessage(),
ce.getMessage().contains("No implementation"), equalTo(true));
ce.getMessage().contains("Could not find a suitable constructor"), equalTo(true));
}
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -42,6 +43,8 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -87,9 +90,12 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
public void setUp() throws Exception {
super.setUp();
CapturingTransport transport = new CapturingTransport();
Set<Setting<?>> clusterSettings = new HashSet<>();
clusterSettings.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterSettings.add(MonitoringSettings.EXPORTERS_SETTINGS);
clusterService = new ClusterService(Settings.builder().put("cluster.name",
TransportMonitoringBulkActionTests.class.getName()).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
new ClusterSettings(Settings.EMPTY, clusterSettings), threadPool);
clusterService.setLocalNode(new DiscoveryNode("node", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(),
Version.CURRENT));
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@ -257,8 +263,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
private final Collection<MonitoringDoc> exported = ConcurrentCollections.newConcurrentSet();
public CapturingExporters() {
super(Settings.EMPTY, Collections.emptyMap(), clusterService,
new ClusterSettings(Settings.EMPTY, Collections.singleton(MonitoringSettings.EXPORTERS_SETTINGS)));
super(Settings.EMPTY, Collections.emptyMap(), clusterService);
}
@Override
@ -279,8 +284,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
private final Consumer<Collection<? extends MonitoringDoc>> consumer;
public ConsumingExporters(Consumer<Collection<? extends MonitoringDoc>> consumer) {
super(Settings.EMPTY, Collections.emptyMap(), clusterService,
new ClusterSettings(Settings.EMPTY, Collections.singleton(MonitoringSettings.EXPORTERS_SETTINGS)));
super(Settings.EMPTY, Collections.emptyMap(), clusterService);
this.consumer = consumer;
}

View File

@ -56,8 +56,6 @@ public class NodeStatsCollectorTests extends AbstractCollectorTestCase {
equalTo(internalCluster().getInstance(ClusterService.class, node).localNode().getId()));
assertThat(nodeStatsMonitoringDoc.isNodeMaster(), equalTo(node.equals(internalCluster().getMasterName())));
assertThat(nodeStatsMonitoringDoc.isMlockall(), equalTo(BootstrapInfo.isMemoryLocked()));
assertNotNull(nodeStatsMonitoringDoc.isDiskThresholdDeciderEnabled());
assertNotNull(nodeStatsMonitoringDoc.getDiskThresholdWaterMarkHigh());
assertNotNull(nodeStatsMonitoringDoc.getNodeStats());
}
@ -68,8 +66,6 @@ public class NodeStatsCollectorTests extends AbstractCollectorTestCase {
internalCluster().getInstance(ClusterService.class, nodeId),
internalCluster().getInstance(MonitoringSettings.class, nodeId),
internalCluster().getInstance(XPackLicenseState.class, nodeId),
internalCluster().getInstance(InternalClient.class, nodeId),
internalCluster().getInstance(NodeEnvironment.class, nodeId),
internalCluster().getInstance(DiskThresholdDecider.class, nodeId));
internalCluster().getInstance(InternalClient.class, nodeId));
}
}

View File

@ -6,14 +6,18 @@
package org.elasticsearch.xpack.monitoring.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.plugin.core.LicenseService;
import org.elasticsearch.license.plugin.core.XPackLicenseState;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.collector.Collector;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.Map;
@ -114,8 +118,14 @@ public abstract class AbstractExporterTemplateTestCase extends MonitoringIntegTe
}
protected void doExporting() throws Exception {
Collector collector = internalCluster().getInstance(ClusterStatsCollector.class);
assertNotNull(collector);
// TODO: these should be unit tests, not using guice
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
XPackLicenseState licenseState = internalCluster().getInstance(XPackLicenseState.class);
LicenseService licenseService = internalCluster().getInstance(LicenseService.class);
InternalClient client = internalCluster().getInstance(InternalClient.class);
Collector collector = new ClusterStatsCollector(clusterService.getSettings(), clusterService,
new MonitoringSettings(clusterService.getSettings(), clusterService.getClusterSettings()),
licenseState, client, licenseService);
Exporters exporters = internalCluster().getInstance(Exporters.class);
assertNotNull(exporters);

View File

@ -21,6 +21,7 @@ import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.exporter.local.LocalExporter;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.security.InternalClient;
import org.junit.Before;
import java.util.ArrayList;
@ -64,21 +65,21 @@ public class ExportersTests extends ESTestCase {
public void init() throws Exception {
factories = new HashMap<>();
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
InternalClient client = mock(InternalClient.class);
clusterService = mock(ClusterService.class);
clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(MonitoringSettings.COLLECTORS,
MonitoringSettings.INTERVAL, MonitoringSettings.EXPORTERS_SETTINGS)));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
// we always need to have the local exporter as it serves as the default one
factories.put(LocalExporter.TYPE, new LocalExporter.Factory(ClientProxy.fromClient(client), clusterService,
factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService,
mock(CleanerService.class)));
clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(MonitoringSettings.COLLECTORS,
MonitoringSettings.INTERVAL, MonitoringSettings.EXPORTERS_SETTINGS)));
exporters = new Exporters(Settings.EMPTY, factories, clusterService, clusterSettings);
exporters = new Exporters(Settings.EMPTY, factories, clusterService);
}
public void testInitExportersDefault() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder().build());
assertThat(internalExporters, notNullValue());
@ -88,8 +89,7 @@ public class ExportersTests extends ESTestCase {
}
public void testInitExportersSingle() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder()
.put("_name.type", "_type")
.build());
@ -97,13 +97,12 @@ public class ExportersTests extends ESTestCase {
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.size(), is(1));
assertThat(internalExporters, hasKey("_name"));
assertThat(internalExporters.get("_name"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.get("_name").type, is("_type"));
assertThat(internalExporters.get("_name"), instanceOf(TestExporter.class));
assertThat(internalExporters.get("_name").config().type(), is("_type"));
}
public void testInitExportersSingleDisabled() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder()
.put("_name.type", "_type")
.put("_name.enabled", false)
@ -138,8 +137,7 @@ public class ExportersTests extends ESTestCase {
}
public void testInitExportersMultipleSameType() throws Exception {
Exporter.Factory factory = new TestFactory("_type", false);
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder()
.put("_name0.type", "_type")
.put("_name1.type", "_type")
@ -148,30 +146,26 @@ public class ExportersTests extends ESTestCase {
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.size(), is(2));
assertThat(internalExporters, hasKey("_name0"));
assertThat(internalExporters.get("_name0"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.get("_name0").type, is("_type"));
assertThat(internalExporters.get("_name0"), instanceOf(TestExporter.class));
assertThat(internalExporters.get("_name0").config().type(), is("_type"));
assertThat(internalExporters, hasKey("_name1"));
assertThat(internalExporters.get("_name1"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.get("_name1").type, is("_type"));
assertThat(internalExporters.get("_name1"), instanceOf(TestExporter.class));
assertThat(internalExporters.get("_name1").config().type(), is("_type"));
}
public void testInitExportersMultipleSameTypeSingletons() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
try {
factories.put("_type", TestSingletonExporter::new);
SettingsException e = expectThrows(SettingsException.class, () ->
exporters.initExporters(Settings.builder()
.put("_name0.type", "_type")
.put("_name1.type", "_type")
.build());
fail("Expected SettingsException");
} catch (SettingsException e) {
assertThat(e.getMessage(), containsString("multiple [_type] exporters are configured. there can only be one"));
}
.build())
);
assertThat(e.getMessage(), containsString("multiple [_type] exporters are configured. there can only be one"));
}
public void testSettingsUpdate() throws Exception {
Exporter.Factory factory = spy(new TestFactory("_type", false));
factories.put("_type", factory);
factories.put("_type", TestExporter::new);
final AtomicReference<Settings> settingsHolder = new AtomicReference<>();
@ -180,8 +174,9 @@ public class ExportersTests extends ESTestCase {
.put("xpack.monitoring.collection.exporters._name1.type", "_type")
.build();
clusterSettings = new ClusterSettings(nodeSettings, new HashSet<>(Arrays.asList(MonitoringSettings.EXPORTERS_SETTINGS)));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
exporters = new Exporters(nodeSettings, factories, clusterService, clusterSettings) {
exporters = new Exporters(nodeSettings, factories, clusterService) {
@Override
Map<String, Exporter> initExporters(Settings settings) {
settingsHolder.set(settings);
@ -211,14 +206,14 @@ public class ExportersTests extends ESTestCase {
}
public void testOpenBulkOnMaster() throws Exception {
Exporter.Factory factory = new MockFactory("mock", false);
Exporter.Factory masterOnlyFactory = new MockFactory("mock_master_only", true);
Exporter.Factory factory = new MockFactory(false);
Exporter.Factory masterOnlyFactory = new MockFactory(true);
factories.put("mock", factory);
factories.put("mock_master_only", masterOnlyFactory);
Exporters exporters = new Exporters(Settings.builder()
.put("xpack.monitoring.collection.exporters._name0.type", "mock")
.put("xpack.monitoring.collection.exporters._name1.type", "mock_master_only")
.build(), factories, clusterService, clusterSettings);
.build(), factories, clusterService);
exporters.start();
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
@ -236,14 +231,14 @@ public class ExportersTests extends ESTestCase {
}
public void testExportNotOnMaster() throws Exception {
Exporter.Factory factory = new MockFactory("mock", false);
Exporter.Factory masterOnlyFactory = new MockFactory("mock_master_only", true);
Exporter.Factory factory = new MockFactory(false);
Exporter.Factory masterOnlyFactory = new MockFactory(true);
factories.put("mock", factory);
factories.put("mock_master_only", masterOnlyFactory);
Exporters exporters = new Exporters(Settings.builder()
.put("xpack.monitoring.collection.exporters._name0.type", "mock")
.put("xpack.monitoring.collection.exporters._name1.type", "mock_master_only")
.build(), factories, clusterService, clusterSettings);
.build(), factories, clusterService);
exporters.start();
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
@ -257,6 +252,7 @@ public class ExportersTests extends ESTestCase {
verify(exporters.getExporter("_name0"), times(1)).masterOnly();
verify(exporters.getExporter("_name0"), times(1)).openBulk();
verify(exporters.getExporter("_name1"), times(1)).masterOnly();
verify(exporters.getExporter("_name1"), times(1)).isSingleton();
verifyNoMoreInteractions(exporters.getExporter("_name1"));
}
@ -273,10 +269,9 @@ public class ExportersTests extends ESTestCase {
settings.put("xpack.monitoring.collection.exporters._name" + String.valueOf(i) + ".type", "record");
}
Exporter.Factory factory = new CountingExportFactory("record", false);
factories.put("record", factory);
factories.put("record", CountingExporter::new);
Exporters exporters = new Exporters(settings.build(), factories, clusterService, clusterSettings);
Exporters exporters = new Exporters(settings.build(), factories, clusterService);
exporters.start();
final Thread[] threads = new Thread[3 + randomInt(7)];
@ -327,51 +322,50 @@ public class ExportersTests extends ESTestCase {
assertThat(exceptions, empty());
for (Exporter exporter : exporters) {
assertThat(exporter, instanceOf(CountingExportFactory.CountingExporter.class));
assertThat(((CountingExportFactory.CountingExporter) exporter).getExportedCount(), equalTo(total));
assertThat(exporter, instanceOf(CountingExporter.class));
assertThat(((CountingExporter) exporter).getExportedCount(), equalTo(total));
}
exporters.close();
}
static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> {
public TestFactory(String type, boolean singleton) {
super(type, singleton);
static class TestExporter extends Exporter {
public TestExporter(Config config) {
super(config);
}
@Override
public TestExporter create(Exporter.Config config) {
return new TestExporter(type(), config);
public ExportBulk openBulk() {
return mock(ExportBulk.class);
}
static class TestExporter extends Exporter {
public TestExporter(String type, Config config) {
super(type, config);
}
@Override
public ExportBulk openBulk() {
return mock(ExportBulk.class);
}
@Override
public void doClose() {
}
@Override
public void doClose() {
}
}
static class MockFactory extends Exporter.Factory<Exporter> {
static class TestSingletonExporter extends TestExporter {
TestSingletonExporter(Config config) {
super(config);
}
@Override
public boolean isSingleton() {
return true;
}
}
static class MockFactory implements Exporter.Factory {
private final boolean masterOnly;
public MockFactory(String type, boolean masterOnly) {
super(type, false);
public MockFactory(boolean masterOnly) {
this.masterOnly = masterOnly;
}
@Override
public Exporter create(Exporter.Config config) {
Exporter exporter = mock(Exporter.class);
when(exporter.type()).thenReturn(type());
when(exporter.name()).thenReturn(config.name());
when(exporter.masterOnly()).thenReturn(masterOnly);
when(exporter.openBulk()).thenReturn(mock(ExportBulk.class));
@ -379,73 +373,58 @@ public class ExportersTests extends ESTestCase {
}
}
/**
* A factory of exporters that count the number of exported documents.
*/
static class CountingExportFactory extends Exporter.Factory<CountingExportFactory.CountingExporter> {
static class CountingExporter extends Exporter {
public CountingExportFactory(String type, boolean singleton) {
super(type, singleton);
private static final AtomicInteger count = new AtomicInteger(0);
private List<CountingBulk> bulks = new CopyOnWriteArrayList<>();
public CountingExporter(Config config) {
super(config);
}
@Override
public CountingExporter create(Exporter.Config config) {
return new CountingExporter(type(), config);
public ExportBulk openBulk() {
CountingBulk bulk = new CountingBulk(config.type() + "#" + count.getAndIncrement());
bulks.add(bulk);
return bulk;
}
static class CountingExporter extends Exporter {
private static final AtomicInteger count = new AtomicInteger(0);
private List<CountingBulk> bulks = new CopyOnWriteArrayList<>();
public CountingExporter(String type, Config config) {
super(type, config);
}
@Override
public ExportBulk openBulk() {
CountingBulk bulk = new CountingBulk(type + "#" + count.getAndIncrement());
bulks.add(bulk);
return bulk;
}
@Override
public void doClose() {
}
public int getExportedCount() {
int exported = 0;
for (CountingBulk bulk : bulks) {
exported += bulk.getCount();
}
return exported;
}
@Override
public void doClose() {
}
static class CountingBulk extends ExportBulk {
private final AtomicInteger count = new AtomicInteger();
public CountingBulk(String name) {
super(name);
public int getExportedCount() {
int exported = 0;
for (CountingBulk bulk : bulks) {
exported += bulk.getCount();
}
return exported;
}
}
@Override
protected void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
count.addAndGet(docs.size());
}
static class CountingBulk extends ExportBulk {
@Override
protected void doFlush() {
}
private final AtomicInteger count = new AtomicInteger();
@Override
protected void doClose() throws ExportException {
}
public CountingBulk(String name) {
super(name);
}
int getCount() {
return count.get();
}
@Override
protected void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
count.addAndGet(docs.size());
}
@Override
protected void doFlush() {
}
@Override
protected void doClose() throws ExportException {
}
int getCount() {
return count.get();
}
}
}

View File

@ -63,8 +63,6 @@ public class NodeStatsResolverTests extends MonitoringIndexNameResolverTestCase<
doc.setSourceNode(new DiscoveryNode("id", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT));
doc.setNodeMaster(randomBoolean());
doc.setNodeId(UUID.randomUUID().toString());
doc.setDiskThresholdDeciderEnabled(randomBoolean());
doc.setDiskThresholdWaterMarkHigh(randomDouble());
doc.setMlockall(randomBoolean());
doc.setNodeStats(randomNodeStats());
return doc;

View File

@ -38,6 +38,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.plugin.Licensing;
import org.elasticsearch.license.plugin.core.LicenseService;
import org.elasticsearch.license.plugin.core.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.IngestPlugin;
@ -139,7 +140,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
this.licensing = new Licensing(settings);
this.security = new Security(settings, env, licenseState);
this.monitoring = new Monitoring(settings);
this.monitoring = new Monitoring(settings, env, licenseState);
this.watcher = new Watcher(settings);
this.graph = new Graph(settings);
this.notification = new Notification(settings);
@ -173,8 +174,6 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
if (transportClientMode == false) {
modules.add(new TextTemplateModule());
// Note: this only exists so LicenseService subclasses can be bound in mock tests
modules.addAll(licensing.nodeModules());
} else {
modules.add(b -> b.bind(XPackLicenseState.class).toProvider(Providers.of(null)));
}
@ -185,7 +184,6 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
ArrayList<Class<? extends LifecycleComponent>> services = new ArrayList<>();
services.addAll(notification.nodeServices());
services.addAll(monitoring.nodeServices());
return services;
}
@ -196,9 +194,14 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
final InternalClient internalClient = new InternalClient(settings, threadPool, client, security.getCryptoService());
components.add(internalClient);
components.addAll(licensing.createComponents(clusterService, getClock(), env, resourceWatcherService, licenseState));
LicenseService licenseService = new LicenseService(settings, clusterService, getClock(),
env, resourceWatcherService, licenseState);
components.add(licenseService);
components.add(licenseState);
components.addAll(security.createComponents(internalClient, threadPool, clusterService, resourceWatcherService,
extensionsService.getExtensions()));
components.addAll(monitoring.createComponents(internalClient, threadPool, clusterService, licenseService));
// watcher http stuff
Map<String, HttpAuthFactory> httpAuthFactories = new HashMap<>();

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -18,6 +19,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
@ -192,8 +194,10 @@ public class IndexActionTests extends ESIntegTestCase {
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout);
}
builder.endObject();
Client client = client();
InternalClient internalClient = new InternalClient(client.settings(), client.threadPool(), client, null);
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.fromClient(client()));
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, internalClient);
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
@ -221,7 +225,10 @@ public class IndexActionTests extends ESIntegTestCase {
}
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY,ClientProxy.fromClient(client()));
Client client = client();
InternalClient internalClient = new InternalClient(client.settings(), client.threadPool(), client, null);
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, internalClient);
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
try {