diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/AgentService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/AgentService.java index 5f474b7b3ba..f8ff2a06a27 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/AgentService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/AgentService.java @@ -120,10 +120,6 @@ public class AgentService extends AbstractLifecycleComponent { @Override protected void doStart() { logger.debug("monitoring service started"); - - for (Collector collector : collectors) { - collector.start(); - } exporters.start(); applyIntervalSettings(); } @@ -140,19 +136,11 @@ public class AgentService extends AbstractLifecycleComponent { } } - for (Collector collector : collectors) { - collector.stop(); - } - exporters.stop(); } @Override protected void doClose() { - for (Collector collector : collectors) { - collector.close(); - } - for (Exporter exporter : exporters) { try { exporter.close(); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/AbstractCollector.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/AbstractCollector.java deleted file mode 100644 index fa84737f724..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/AbstractCollector.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.monitoring.collector; - -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.xpack.monitoring.MonitoredSystem; -import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; - -import java.util.Collection; - -public abstract class AbstractCollector extends AbstractLifecycleComponent implements Collector { - - private final String name; - - protected final ClusterService clusterService; - protected final MonitoringSettings monitoringSettings; - protected final XPackLicenseState licenseState; - - public AbstractCollector(Settings settings, String name, ClusterService clusterService, - MonitoringSettings monitoringSettings, XPackLicenseState licenseState) { - super(settings); - this.name = name; - this.clusterService = clusterService; - this.monitoringSettings = monitoringSettings; - this.licenseState = licenseState; - } - - @Override - public String name() { - return name; - } - - @Override - public String toString() { - return name; - } - - @Override - public void start() { - logger.debug("starting collector [{}]", name()); - super.start(); - } - - @Override - protected void doStart() { - } - - /** - * Indicates if the current collector is allowed to collect data - */ - protected boolean shouldCollect() { - if (licenseState.isMonitoringAllowed() == false) { - logger.trace("collector [{}] can not collect data due to invalid license", name()); - return false; - } - return true; - } - - protected boolean isLocalNodeMaster() { - return clusterService.state().nodes().isLocalNodeElectedMaster(); - } - - @Override - public Collection collect() { - try { - if (shouldCollect()) { - logger.trace("collector [{}] - collecting data...", name()); - return doCollect(); - } - } catch (ElasticsearchTimeoutException e) { - logger.error("collector [{}] timed out when collecting data", name()); - } catch (Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("collector [{}] - failed collecting data", name()), e); - } - return null; - } - - protected abstract Collection doCollect() throws Exception; - - @Override - public void stop() { - logger.debug("stopping collector [{}]", name()); - super.stop(); - } - - @Override - protected void doStop() { - } - - @Override - public void close() { - logger.trace("closing collector [{}]", name()); - super.close(); - } - - @Override - protected void doClose() { - } - - protected String clusterUUID() { - return clusterService.state().metaData().clusterUUID(); - } - - - protected DiscoveryNode localNode() { - return clusterService.localNode(); - } - - protected String monitoringId() { - // Collectors always collects data for Elasticsearch - return MonitoredSystem.ES.getSystem(); - } - - protected String monitoringVersion() { - // Collectors always collects data for the current version of Elasticsearch - return Version.CURRENT.toString(); - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java index 92ac962b5ba..a209fc08141 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java @@ -5,14 +5,96 @@ */ package org.elasticsearch.xpack.monitoring.collector; -import org.elasticsearch.common.component.LifecycleComponent; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.monitoring.MonitoredSystem; +import org.elasticsearch.xpack.monitoring.MonitoringSettings; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; import java.util.Collection; -public interface Collector extends LifecycleComponent { +/** + * {@link Collector} are used to collect monitoring data about the cluster, nodes and indices. + */ +public abstract class Collector extends AbstractComponent { - String name(); + private final String name; - Collection collect(); -} \ No newline at end of file + protected final ClusterService clusterService; + protected final MonitoringSettings monitoringSettings; + protected final XPackLicenseState licenseState; + + public Collector(Settings settings, String name, ClusterService clusterService, + MonitoringSettings monitoringSettings, XPackLicenseState licenseState) { + super(settings); + this.name = name; + this.clusterService = clusterService; + this.monitoringSettings = monitoringSettings; + this.licenseState = licenseState; + } + + public String name() { + return name; + } + + @Override + public String toString() { + return name(); + } + + /** + * Indicates if the current collector is allowed to collect data + */ + protected boolean shouldCollect() { + if (licenseState.isMonitoringAllowed() == false) { + logger.trace("collector [{}] can not collect data due to invalid license", name()); + return false; + } + return true; + } + + protected boolean isLocalNodeMaster() { + return clusterService.state().nodes().isLocalNodeElectedMaster(); + } + + public Collection collect() { + try { + if (shouldCollect()) { + logger.trace("collector [{}] - collecting data...", name()); + return doCollect(); + } + } catch (ElasticsearchTimeoutException e) { + logger.error((Supplier) () -> new ParameterizedMessage("collector [{}] timed out when collecting data", name())); + } catch (Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("collector [{}] failed to collect data", name()), e); + } + return null; + } + + protected abstract Collection doCollect() throws Exception; + + protected String clusterUUID() { + return clusterService.state().metaData().clusterUUID(); + } + + protected DiscoveryNode localNode() { + return clusterService.localNode(); + } + + protected static String monitoringId() { + // Collectors always collects data for Elasticsearch + return MonitoredSystem.ES.getSystem(); + } + + protected static String monitoringVersion() { + // Collectors always collects data for the current version of Elasticsearch + return Version.CURRENT.toString(); + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStateCollector.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStateCollector.java index d1220361581..0a113f79429 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStateCollector.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStateCollector.java @@ -19,7 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.monitoring.collector.AbstractCollector; +import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.security.InternalClient; @@ -29,7 +29,7 @@ import org.elasticsearch.xpack.security.InternalClient; * This collector runs on the master node only and collects {@link ClusterStateMonitoringDoc} document * at a given frequency. */ -public class ClusterStateCollector extends AbstractCollector { +public class ClusterStateCollector extends Collector { public static final String NAME = "cluster-state-collector"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java index bc83c422e5c..36f65b1a836 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java @@ -21,7 +21,7 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.XPackFeatureSet; import org.elasticsearch.xpack.action.XPackUsageRequestBuilder; import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.monitoring.collector.AbstractCollector; +import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.security.InternalClient; @@ -40,7 +40,7 @@ import java.util.List; * document; the cluster stats are also indexed in the timestamped index in a * "cluster_stats" document. */ -public class ClusterStatsCollector extends AbstractCollector { +public class ClusterStatsCollector extends Collector { public static final String NAME = "cluster-stats-collector"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java index 31c4370fbd7..911cc3adc0e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java @@ -12,7 +12,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.monitoring.collector.AbstractCollector; +import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.security.InternalClient; @@ -27,7 +27,7 @@ import java.util.List; * This collector runs on the master node only and collects a {@link IndexRecoveryMonitoringDoc} document * for every index that has on-going shard recoveries. */ -public class IndexRecoveryCollector extends AbstractCollector { +public class IndexRecoveryCollector extends Collector { public static final String NAME = "index-recovery-collector"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java index 3a3542068e9..50b098eccd1 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java @@ -14,7 +14,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.monitoring.collector.AbstractCollector; +import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.security.InternalClient; @@ -29,7 +29,7 @@ import java.util.List; * This collector runs on the master node only and collect a {@link IndexStatsMonitoringDoc} document * for each existing index in the cluster. */ -public class IndexStatsCollector extends AbstractCollector { +public class IndexStatsCollector extends Collector { public static final String NAME = "index-stats-collector"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsCollector.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsCollector.java index 1b71cc79dab..f62a15236d7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsCollector.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsCollector.java @@ -12,7 +12,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.monitoring.collector.AbstractCollector; +import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.security.InternalClient; @@ -24,7 +24,7 @@ import java.util.Collections; *

* This collector runs on the master node only and collect one {@link IndicesStatsMonitoringDoc} document. */ -public class IndicesStatsCollector extends AbstractCollector { +public class IndicesStatsCollector extends Collector { public static final String NAME = "indices-stats-collector"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java index 97662779e75..410fe6da6e9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java @@ -19,7 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.monitoring.collector.AbstractCollector; +import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.security.InternalClient; @@ -29,7 +29,7 @@ import org.elasticsearch.xpack.security.InternalClient; * This collector runs on every non-client node and collect * a {@link NodeStatsMonitoringDoc} document for each node of the cluster. */ -public class NodeStatsCollector extends AbstractCollector { +public class NodeStatsCollector extends Collector { public static final String NAME = "node-stats-collector"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java index e6965bedf5a..aae4765a14a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java @@ -20,7 +20,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.monitoring.collector.AbstractCollector; +import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; /** @@ -29,7 +29,7 @@ import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; * This collector runs on the master node only and collects the {@link ShardMonitoringDoc} documents * for every index shard. */ -public class ShardsCollector extends AbstractCollector { +public class ShardsCollector extends Collector { public static final String NAME = "shards-collector"; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/collector/AbstractCollectorTestCase.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/collector/AbstractCollectorTestCase.java index feb62c5dd86..4d691227333 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/collector/AbstractCollectorTestCase.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/collector/AbstractCollectorTestCase.java @@ -35,7 +35,7 @@ public abstract class AbstractCollectorTestCase extends MonitoringIntegTestCase return internalCluster().getInstance(InternalClient.class, nodeId); } - protected void assertCanCollect(AbstractCollector collector) { + protected void assertCanCollect(Collector collector) { assertNotNull(collector); assertTrue("collector [" + collector.name() + "] should be able to collect data", collector.shouldCollect()); Collection results = collector.collect(); @@ -43,15 +43,12 @@ public abstract class AbstractCollectorTestCase extends MonitoringIntegTestCase } public void waitForNoBlocksOnNodes() throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - for (String nodeId : internalCluster().getNodeNames()) { - try { - waitForNoBlocksOnNode(nodeId); - } catch (Exception e) { - fail("failed to wait for no blocks on node [" + nodeId + "]: " + e.getMessage()); - } + assertBusy(() -> { + for (String nodeId : internalCluster().getNodeNames()) { + try { + waitForNoBlocksOnNode(nodeId); + } catch (Exception e) { + fail("failed to wait for no blocks on node [" + nodeId + "]: " + e.getMessage()); } } }); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java index 7e676408682..db2d5af14d6 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java @@ -15,7 +15,7 @@ import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.monitoring.MonitoredSystem; import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.monitoring.collector.AbstractCollector; +import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.collector.AbstractCollectorTestCase; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; @@ -89,15 +89,4 @@ public class ClusterStatsCollectorTests extends AbstractCollectorTestCase { securedClient(nodeId), internalCluster().getInstance(LicenseService.class, nodeId)); } - - private void assertCanCollect(AbstractCollector collector, Class... classes) { - super.assertCanCollect(collector); - Collection results = collector.collect(); - if (classes != null) { - assertThat(results.size(), equalTo(classes.length)); - for (Class cl : classes) { - assertThat(results.stream().filter(o -> cl.isInstance(o)).count(), equalTo(1L)); - } - } - } }