diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java index 3e3bc5d0af7..85413348ccd 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java @@ -161,7 +161,7 @@ public class Monitoring implements ActionPlugin { collectors.add(new IndexRecoveryCollector(settings, clusterService, licenseState, client)); collectors.add(new JobStatsCollector(settings, clusterService, licenseState, client)); - final MonitoringService monitoringService = new MonitoringService(settings, clusterSettings, threadPool, collectors, exporters); + final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters); return Arrays.asList(monitoringService, exporters, cleanerService); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringService.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringService.java index c7588a09541..30e4df5d953 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringService.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.monitoring; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -61,6 +63,7 @@ public class MonitoringService extends AbstractLifecycleComponent { /** Task in charge of collecting and exporting monitoring data **/ private final MonitoringExecution monitor = new MonitoringExecution(); + private final ClusterService clusterService; private final ThreadPool threadPool; private final Set collectors; private final Exporters exporters; @@ -68,14 +71,15 @@ public class MonitoringService extends AbstractLifecycleComponent { private volatile TimeValue interval; private volatile ThreadPool.Cancellable scheduler; - MonitoringService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, + MonitoringService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Set collectors, Exporters exporters) { super(settings); + this.clusterService = Objects.requireNonNull(clusterService); this.threadPool = Objects.requireNonNull(threadPool); this.collectors = Objects.requireNonNull(collectors); this.exporters = Objects.requireNonNull(exporters); this.interval = INTERVAL.get(settings); - clusterSettings.addSettingsUpdateConsumer(INTERVAL, this::setInterval); + clusterService.getClusterSettings().addSettingsUpdateConsumer(INTERVAL, this::setInterval); } void setInterval(TimeValue interval) { @@ -191,6 +195,8 @@ public class MonitoringService extends AbstractLifecycleComponent { @Override protected void doRun() throws Exception { final long timestamp = System.currentTimeMillis(); + final long intervalInMillis = interval.getMillis(); + final ClusterState clusterState = clusterService.state(); final Collection results = new ArrayList<>(); for (Collector collector : collectors) { @@ -201,7 +207,7 @@ public class MonitoringService extends AbstractLifecycleComponent { } try { - Collection result = collector.collect(timestamp, interval.getMillis()); + Collection result = collector.collect(timestamp, intervalInMillis, clusterState); if (result != null) { results.addAll(result); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java index 7c8aacd04a2..b86aa071804 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.monitoring.collector; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -68,8 +69,10 @@ public abstract class Collector extends AbstractComponent { /** * Indicates if the current collector is allowed to collect data + * + * @param isElectedMaster true if the current local node is the elected master node */ - protected boolean shouldCollect() { + protected boolean shouldCollect(final boolean isElectedMaster) { if (licenseState.isMonitoringAllowed() == false) { logger.trace("collector [{}] can not collect data due to invalid license", name()); return false; @@ -77,15 +80,12 @@ public abstract class Collector extends AbstractComponent { return true; } - protected boolean isLocalNodeMaster() { - return clusterService.state().nodes().isLocalNodeElectedMaster(); - } - - public Collection collect(final long timestamp, final long interval) { + public Collection collect(final long timestamp, final long interval, final ClusterState clusterState) { try { - if (shouldCollect()) { + final boolean isElectedMaster = clusterState.getNodes().isLocalNodeElectedMaster(); + if (shouldCollect(isElectedMaster)) { logger.trace("collector [{}] - collecting data...", name()); - return doCollect(convertNode(timestamp, clusterService.localNode()), interval); + return doCollect(convertNode(timestamp, clusterService.localNode()), interval, clusterState); } } catch (ElasticsearchTimeoutException e) { logger.error((Supplier) () -> new ParameterizedMessage("collector [{}] timed out when collecting data", name())); @@ -95,11 +95,9 @@ public abstract class Collector extends AbstractComponent { return null; } - protected abstract Collection doCollect(MonitoringDoc.Node sourceNode, long interval) throws Exception; - - protected String clusterUUID() { - return clusterService.state().metaData().clusterUUID(); - } + protected abstract Collection doCollect(MonitoringDoc.Node node, + long interval, + ClusterState clusterState) throws Exception; /** * Returns a timestamp to use in {@link MonitoringDoc} @@ -110,6 +108,16 @@ public abstract class Collector extends AbstractComponent { return System.currentTimeMillis(); } + /** + * Extracts the current cluster's UUID from a {@link ClusterState} + * + * @param clusterState the {@link ClusterState} + * @return the cluster's UUID + */ + protected static String clusterUuid(final ClusterState clusterState) { + return clusterState.metaData().clusterUUID(); + } + /** * Returns the value of the collection timeout configured for the current {@link Collector}. * diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java index 523bcc72f72..04a28c93b47 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java @@ -81,13 +81,15 @@ public class ClusterStatsCollector extends Collector { } @Override - protected boolean shouldCollect() { + protected boolean shouldCollect(final boolean isElectedMaster) { // This collector can always collect data on the master node - return isLocalNodeMaster(); + return isElectedMaster; } @Override - protected Collection doCollect(final MonitoringDoc.Node node, final long interval) throws Exception { + protected Collection doCollect(final MonitoringDoc.Node node, + final long interval, + final ClusterState clusterState) throws Exception { final Supplier clusterStatsSupplier = () -> client.admin().cluster().prepareClusterStats().get(getCollectionTimeout()); final Supplier> usageSupplier = @@ -96,8 +98,8 @@ public class ClusterStatsCollector extends Collector { final ClusterStatsResponse clusterStats = clusterStatsSupplier.get(); final String clusterName = clusterService.getClusterName().value(); + final String clusterUuid = clusterUuid(clusterState); final String version = Version.CURRENT.toString(); - final ClusterState clusterState = clusterService.state(); final License license = licenseService.getLicense(); final List xpackUsage = collect(usageSupplier); final boolean apmIndicesExist = doAPMIndicesExist(clusterState); @@ -108,7 +110,7 @@ public class ClusterStatsCollector extends Collector { // Adds a cluster stats document return Collections.singleton( - new ClusterStatsMonitoringDoc(clusterUUID(), timestamp(), interval, node, clusterName, version, clusterStats.getStatus(), + new ClusterStatsMonitoringDoc(clusterUuid, timestamp(), interval, node, clusterName, version, clusterStats.getStatus(), license, apmIndicesExist, xpackUsage, clusterStats, clusterState, clusterNeedsTLSEnabled)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java index 34d681be839..06cebf7a857 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.monitoring.collector.indices; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -59,12 +60,14 @@ public class IndexRecoveryCollector extends Collector { } @Override - protected boolean shouldCollect() { - return super.shouldCollect() && isLocalNodeMaster(); + protected boolean shouldCollect(final boolean isElectedMaster) { + return isElectedMaster && super.shouldCollect(isElectedMaster); } @Override - protected Collection doCollect(final MonitoringDoc.Node node, final long interval) throws Exception { + protected Collection doCollect(final MonitoringDoc.Node node, + final long interval, + final ClusterState clusterState) throws Exception { List results = new ArrayList<>(1); RecoveryResponse recoveryResponse = client.admin().indices().prepareRecoveries() .setIndices(getCollectionIndices()) @@ -73,7 +76,8 @@ public class IndexRecoveryCollector extends Collector { .get(getCollectionTimeout()); if (recoveryResponse.hasRecoveries()) { - results.add(new IndexRecoveryMonitoringDoc(clusterUUID(), timestamp(), interval, node, recoveryResponse)); + final String clusterUuid = clusterUuid(clusterState); + results.add(new IndexRecoveryMonitoringDoc(clusterUuid, timestamp(), interval, node, recoveryResponse)); } return Collections.unmodifiableCollection(results); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java index e3ff99d4d19..e64a83dbf5d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java @@ -49,12 +49,14 @@ public class IndexStatsCollector extends Collector { } @Override - protected boolean shouldCollect() { - return super.shouldCollect() && isLocalNodeMaster(); + protected boolean shouldCollect(final boolean isElectedMaster) { + return isElectedMaster && super.shouldCollect(isElectedMaster); } @Override - protected Collection doCollect(final MonitoringDoc.Node node, final long interval) throws Exception { + protected Collection doCollect(final MonitoringDoc.Node node, + final long interval, + final ClusterState clusterState) throws Exception { final List results = new ArrayList<>(); final IndicesStatsResponse indicesStats = client.admin().indices().prepareStats() .setIndices(getCollectionIndices()) @@ -73,8 +75,7 @@ public class IndexStatsCollector extends Collector { .get(getCollectionTimeout()); final long timestamp = timestamp(); - final String clusterUuid = clusterUUID(); - final ClusterState clusterState = clusterService.state(); + final String clusterUuid = clusterUuid(clusterState); // add the indices stats that we use to collect the index stats results.add(new IndicesStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indicesStats)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollector.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollector.java index a5a2c35b905..bf570bd64e3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollector.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollector.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.monitoring.collector.ml; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; @@ -57,15 +58,18 @@ public class JobStatsCollector extends Collector { } @Override - protected boolean shouldCollect() { + protected boolean shouldCollect(final boolean isElectedMaster) { // This can only run when monitoring is allowed + ML is enabled/allowed, but also only on the elected master node - return super.shouldCollect() && - XPackSettings.MACHINE_LEARNING_ENABLED.get(settings) && licenseState.isMachineLearningAllowed() && - isLocalNodeMaster(); + return isElectedMaster + && super.shouldCollect(isElectedMaster) + && XPackSettings.MACHINE_LEARNING_ENABLED.get(settings) + && licenseState.isMachineLearningAllowed(); } @Override - protected List doCollect(final MonitoringDoc.Node node, final long interval) throws Exception { + protected List doCollect(final MonitoringDoc.Node node, + final long interval, + final ClusterState clusterState) throws Exception { // fetch details about all jobs try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, MONITORING_ORIGIN)) { final GetJobsStatsAction.Response jobs = @@ -73,7 +77,7 @@ public class JobStatsCollector extends Collector { .actionGet(getCollectionTimeout()); final long timestamp = timestamp(); - final String clusterUuid = clusterUUID(); + final String clusterUuid = clusterUuid(clusterState); return jobs.getResponse().results().stream() .map(jobStats -> new JobStatsMonitoringDoc(clusterUuid, timestamp, interval, node, jobStats)) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java index c41905a72a6..c67a08fda5a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.bootstrap.BootstrapInfo; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -59,12 +60,14 @@ public class NodeStatsCollector extends Collector { // For testing purpose @Override - protected boolean shouldCollect() { - return super.shouldCollect(); + protected boolean shouldCollect(final boolean isElectedMaster) { + return super.shouldCollect(isElectedMaster); } @Override - protected Collection doCollect(final MonitoringDoc.Node node, final long interval) throws Exception { + protected Collection doCollect(final MonitoringDoc.Node node, + final long interval, + final ClusterState clusterState) throws Exception { NodesStatsRequest request = new NodesStatsRequest("_local"); request.indices(FLAGS); request.os(true); @@ -81,10 +84,11 @@ public class NodeStatsCollector extends Collector { throw response.failures().get(0); } + final String clusterUuid = clusterUuid(clusterState); final NodeStats nodeStats = response.getNodes().get(0); - return Collections.singletonList(new NodeStatsMonitoringDoc(clusterUUID(), nodeStats.getTimestamp(), interval, node, - node.getUUID(), isLocalNodeMaster(), nodeStats, BootstrapInfo.isMemoryLocked())); + return Collections.singletonList(new NodeStatsMonitoringDoc(clusterUuid, nodeStats.getTimestamp(), interval, node, + node.getUUID(), clusterState.getNodes().isLocalNodeElectedMaster(), nodeStats, BootstrapInfo.isMemoryLocked())); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java index 1257f4bdcc9..f1a23d46d2c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java @@ -38,21 +38,21 @@ public class ShardsCollector extends Collector { } @Override - protected boolean shouldCollect() { - return super.shouldCollect() && isLocalNodeMaster(); + protected boolean shouldCollect(final boolean isElectedMaster) { + return isElectedMaster && super.shouldCollect(isElectedMaster); } @Override - protected Collection doCollect(final MonitoringDoc.Node node, final long interval) throws Exception { + protected Collection doCollect(final MonitoringDoc.Node node, + final long interval, + final ClusterState clusterState) throws Exception { final List results = new ArrayList<>(1); - - final ClusterState clusterState = clusterService.state(); if (clusterState != null) { RoutingTable routingTable = clusterState.routingTable(); if (routingTable != null) { List shards = routingTable.allShards(); if (shards != null) { - final String clusterUUID = clusterUUID(); + final String clusterUuid = clusterUuid(clusterState); final String stateUUID = clusterState.stateUUID(); final long timestamp = timestamp(); @@ -66,7 +66,7 @@ public class ShardsCollector extends Collector { // If the shard is assigned to a node, the shard monitoring document refers to this node shardNode = convertNode(node.getTimestamp(), clusterState.getNodes().get(shard.currentNodeId())); } - results.add(new ShardMonitoringDoc(clusterUUID, timestamp, interval, shardNode, shard, stateUUID)); + results.add(new ShardMonitoringDoc(clusterUuid, timestamp, interval, shardNode, shard, stateUUID)); } } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringServiceTests.java index 25d33890e12..34d3591dc4c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringServiceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.monitoring; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -48,6 +49,7 @@ public class MonitoringServiceTests extends ESTestCase { final Monitoring monitoring = new Monitoring(Settings.EMPTY, licenseState); clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(monitoring.getSettings())); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.state()).thenReturn(mock(ClusterState.class)); } @After @@ -59,7 +61,7 @@ public class MonitoringServiceTests extends ESTestCase { } public void testIsMonitoringActive() throws Exception { - monitoringService = new MonitoringService(Settings.EMPTY, clusterSettings, threadPool, emptySet(), new CountingExporter()); + monitoringService = new MonitoringService(Settings.EMPTY, clusterService, threadPool, emptySet(), new CountingExporter()); monitoringService.start(); assertBusy(() -> assertTrue(monitoringService.isStarted())); @@ -82,7 +84,7 @@ public class MonitoringServiceTests extends ESTestCase { Settings settings = Settings.builder().put(MonitoringService.INTERVAL.getKey(), TimeValue.MINUS_ONE).build(); CountingExporter exporter = new CountingExporter(); - monitoringService = new MonitoringService(settings, clusterSettings, threadPool, emptySet(), exporter); + monitoringService = new MonitoringService(settings, clusterService, threadPool, emptySet(), exporter); monitoringService.start(); assertBusy(() -> assertTrue(monitoringService.isStarted())); @@ -105,7 +107,7 @@ public class MonitoringServiceTests extends ESTestCase { final BlockingExporter exporter = new BlockingExporter(latch); Settings settings = Settings.builder().put(MonitoringService.INTERVAL.getKey(), MonitoringService.MIN_INTERVAL).build(); - monitoringService = new MonitoringService(settings, clusterSettings, threadPool, emptySet(), exporter); + monitoringService = new MonitoringService(settings, clusterService, threadPool, emptySet(), exporter); monitoringService.start(); assertBusy(() -> assertTrue(monitoringService.isStarted())); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/BaseCollectorTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/BaseCollectorTestCase.java index 07e2cc7751f..8ae67bb8a3a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/BaseCollectorTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/BaseCollectorTestCase.java @@ -59,7 +59,7 @@ public abstract class BaseCollectorTestCase extends ESTestCase { protected void whenLocalNodeElectedMaster(final boolean electedMaster) { when(clusterService.state()).thenReturn(clusterState); - when(clusterState.nodes()).thenReturn(nodes); + when(clusterState.getNodes()).thenReturn(nodes); when(nodes.isLocalNodeElectedMaster()).thenReturn(electedMaster); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java index 929c8dd5e9e..9bfe97b1249 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java @@ -66,24 +66,17 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase { } public void testShouldCollectReturnsFalseIfNotMaster() { - // this controls the blockage - whenLocalNodeElectedMaster(false); - final ClusterStatsCollector collector = new ClusterStatsCollector(Settings.EMPTY, clusterService, licenseState, client, licenseService); - assertThat(collector.shouldCollect(), is(false)); - verify(nodes).isLocalNodeElectedMaster(); + assertThat(collector.shouldCollect(false), is(false)); } public void testShouldCollectReturnsTrue() { - whenLocalNodeElectedMaster(true); - final ClusterStatsCollector collector = new ClusterStatsCollector(Settings.EMPTY, clusterService, licenseState, client, licenseService); - assertThat(collector.shouldCollect(), is(true)); - verify(nodes).isLocalNodeElectedMaster(); + assertThat(collector.shouldCollect(true), is(true)); } public void testDoAPMIndicesExistReturnsBasedOnIndices() { @@ -219,7 +212,7 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase { final long interval = randomNonNegativeLong(); - final Collection results = collector.doCollect(node, interval); + final Collection results = collector.doCollect(node, interval, clusterState); assertEquals(1, results.size()); final MonitoringDoc monitoringDoc = results.iterator().next(); @@ -254,7 +247,8 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase { assertThat(document.getClusterState().stateUUID(), equalTo(clusterState.stateUUID())); verify(clusterService, times(1)).getClusterName(); - verify(clusterService, times(2)).state(); + verify(clusterState, times(1)).metaData(); + verify(metaData, times(1)).clusterUUID(); verify(licenseService, times(1)).getLicense(); verify(clusterAdminClient).prepareClusterStats(); verify(client).execute(same(XPackUsageAction.INSTANCE), any(XPackUsageRequest.class)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollectorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollectorTests.java index afbc4660c92..ca09d88fd12 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollectorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollectorTests.java @@ -44,6 +44,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -52,35 +53,30 @@ public class IndexRecoveryCollectorTests extends BaseCollectorTestCase { public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() { // this controls the blockage when(licenseState.isMonitoringAllowed()).thenReturn(false); - whenLocalNodeElectedMaster(randomBoolean()); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); final IndexRecoveryCollector collector = new IndexRecoveryCollector(Settings.EMPTY, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(false)); - verify(licenseState).isMonitoringAllowed(); + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } } public void testShouldCollectReturnsFalseIfNotMaster() { when(licenseState.isMonitoringAllowed()).thenReturn(true); - // this controls the blockage - whenLocalNodeElectedMaster(false); - final IndexRecoveryCollector collector = new IndexRecoveryCollector(Settings.EMPTY, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(false)); - verify(licenseState).isMonitoringAllowed(); - verify(nodes).isLocalNodeElectedMaster(); + assertThat(collector.shouldCollect(false), is(false)); } public void testShouldCollectReturnsTrue() { when(licenseState.isMonitoringAllowed()).thenReturn(true); - whenLocalNodeElectedMaster(true); - final IndexRecoveryCollector collector = new IndexRecoveryCollector(Settings.EMPTY, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(true)); + assertThat(collector.shouldCollect(true), is(true)); verify(licenseState).isMonitoringAllowed(); - verify(nodes).isLocalNodeElectedMaster(); } public void testDoCollect() throws Exception { @@ -157,8 +153,12 @@ public class IndexRecoveryCollectorTests extends BaseCollectorTestCase { final long interval = randomNonNegativeLong(); - final Collection results = collector.doCollect(node, interval); + final Collection results = collector.doCollect(node, interval, clusterState); verify(indicesAdminClient).prepareRecoveries(); + if (recoveryStates.isEmpty() == false) { + verify(clusterState).metaData(); + verify(metaData).clusterUUID(); + } if (nbRecoveries == 0) { assertEquals(0, results.size()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java index f7528315d91..f61497798a5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java @@ -36,6 +36,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -44,35 +45,30 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase { public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() { // this controls the blockage when(licenseState.isMonitoringAllowed()).thenReturn(false); - whenLocalNodeElectedMaster(randomBoolean()); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); final IndexStatsCollector collector = new IndexStatsCollector(Settings.EMPTY, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(false)); - verify(licenseState).isMonitoringAllowed(); + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } } public void testShouldCollectReturnsFalseIfNotMaster() { when(licenseState.isMonitoringAllowed()).thenReturn(true); - // this controls the blockage - whenLocalNodeElectedMaster(false); - final IndexStatsCollector collector = new IndexStatsCollector(Settings.EMPTY, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(false)); - verify(licenseState).isMonitoringAllowed(); - verify(nodes).isLocalNodeElectedMaster(); + assertThat(collector.shouldCollect(false), is(false)); } public void testShouldCollectReturnsTrue() { when(licenseState.isMonitoringAllowed()).thenReturn(true); - whenLocalNodeElectedMaster(true); - final IndexStatsCollector collector = new IndexStatsCollector(Settings.EMPTY, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(true)); + assertThat(collector.shouldCollect(true), is(true)); verify(licenseState).isMonitoringAllowed(); - verify(nodes).isLocalNodeElectedMaster(); } public void testDoCollect() throws Exception { @@ -133,8 +129,11 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase { final long interval = randomNonNegativeLong(); - final Collection results = collector.doCollect(node, interval); + final Collection results = collector.doCollect(node, interval, clusterState); verify(indicesAdminClient).prepareStats(); + verify(clusterState, times(1 + indices)).metaData(); + verify(clusterState, times(indices)).routingTable(); + verify(metaData).clusterUUID(); assertEquals(1 + indices, results.size()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollectorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollectorTests.java index 0909741ffd4..2948ed92f3a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollectorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollectorTests.java @@ -43,6 +43,8 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase { public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() { final Settings settings = randomFrom(mlEnabledSettings(), mlDisabledSettings()); final boolean mlAllowed = randomBoolean(); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); // this controls the blockage when(licenseState.isMonitoringAllowed()).thenReturn(false); @@ -50,9 +52,10 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase { final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(false)); - - verify(licenseState).isMonitoringAllowed(); + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } } public void testShouldCollectReturnsFalseIfNotMaster() { @@ -62,13 +65,11 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase { when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); when(licenseState.isMachineLearningAllowed()).thenReturn(randomBoolean()); // this controls the blockage - whenLocalNodeElectedMaster(false); + final boolean isElectedMaster = false; final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(false)); - - verify(licenseState).isMonitoringAllowed(); + assertThat(collector.shouldCollect(isElectedMaster), is(false)); } public void testShouldCollectReturnsFalseIfMLIsDisabled() { @@ -77,13 +78,17 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase { when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); when(licenseState.isMachineLearningAllowed()).thenReturn(randomBoolean()); - whenLocalNodeElectedMaster(randomBoolean()); + + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(false)); + assertThat(collector.shouldCollect(isElectedMaster), is(false)); - verify(licenseState).isMonitoringAllowed(); + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } } public void testShouldCollectReturnsFalseIfMLIsNotAllowed() { @@ -92,13 +97,16 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase { when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); // this is controls the blockage when(licenseState.isMachineLearningAllowed()).thenReturn(false); - whenLocalNodeElectedMaster(randomBoolean()); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(false)); + assertThat(collector.shouldCollect(isElectedMaster), is(false)); - verify(licenseState).isMonitoringAllowed(); + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } } public void testShouldCollectReturnsTrue() { @@ -106,18 +114,19 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase { when(licenseState.isMonitoringAllowed()).thenReturn(true); when(licenseState.isMachineLearningAllowed()).thenReturn(true); - whenLocalNodeElectedMaster(true); + final boolean isElectedMaster = true; final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(true)); + assertThat(collector.shouldCollect(isElectedMaster), is(true)); verify(licenseState).isMonitoringAllowed(); } public void testDoCollect() throws Exception { - final MetaData metaData = mock(MetaData.class); final String clusterUuid = randomAlphaOfLength(5); + whenClusterStateWithUUID(clusterUuid); + final MonitoringDoc.Node node = randomMonitoringNode(random()); final MachineLearningClient client = mock(MachineLearningClient.class); final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); @@ -125,10 +134,6 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase { final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120)); withCollectionTimeout(JobStatsCollector.JOB_STATS_TIMEOUT, timeout); - when(clusterService.state()).thenReturn(clusterState); - when(clusterState.metaData()).thenReturn(metaData); - when(metaData.clusterUUID()).thenReturn(clusterUuid); - final JobStatsCollector collector = new JobStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext); assertEquals(timeout, collector.getCollectionTimeout()); @@ -143,7 +148,9 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase { final long interval = randomNonNegativeLong(); - final List monitoringDocs = collector.doCollect(node, interval); + final List monitoringDocs = collector.doCollect(node, interval, clusterState); + verify(clusterState).metaData(); + verify(metaData).clusterUUID(); assertThat(monitoringDocs, hasSize(jobStats.size())); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollectorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollectorTests.java index b6024087a19..2c9449627a0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollectorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollectorTests.java @@ -40,21 +40,24 @@ public class NodeStatsCollectorTests extends BaseCollectorTestCase { public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() { // this controls the blockage when(licenseState.isMonitoringAllowed()).thenReturn(false); - whenLocalNodeElectedMaster(randomBoolean()); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); final NodeStatsCollector collector = new NodeStatsCollector(Settings.EMPTY, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(false)); - verify(licenseState).isMonitoringAllowed(); + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } } public void testShouldCollectReturnsTrue() { when(licenseState.isMonitoringAllowed()).thenReturn(true); - whenLocalNodeElectedMaster(true); + final boolean isElectedMaster = true; final NodeStatsCollector collector = new NodeStatsCollector(Settings.EMPTY, clusterService, licenseState, client); - assertThat(collector.shouldCollect(), is(true)); + assertThat(collector.shouldCollect(isElectedMaster), is(true)); verify(licenseState).isMonitoringAllowed(); } @@ -77,7 +80,7 @@ public class NodeStatsCollectorTests extends BaseCollectorTestCase { assertEquals(timeout, collector.getCollectionTimeout()); final FailedNodeException e = expectThrows(FailedNodeException.class, () -> - collector.doCollect(randomMonitoringNode(random()), randomNonNegativeLong())); + collector.doCollect(randomMonitoringNode(random()), randomNonNegativeLong(), clusterState)); assertEquals(exception, e); } @@ -112,7 +115,10 @@ public class NodeStatsCollectorTests extends BaseCollectorTestCase { final long interval = randomNonNegativeLong(); - final Collection results = collector.doCollect(node, interval); + final Collection results = collector.doCollect(node, interval, clusterState); + verify(clusterState).metaData(); + verify(metaData).clusterUUID(); + assertEquals(1, results.size()); final MonitoringDoc monitoringDoc = results.iterator().next(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollectorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollectorTests.java index 2b8642e0d94..b8753a82a1d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollectorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollectorTests.java @@ -45,12 +45,15 @@ public class ShardsCollectorTests extends BaseCollectorTestCase { public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() { // this controls the blockage when(licenseState.isMonitoringAllowed()).thenReturn(false); - whenLocalNodeElectedMaster(randomBoolean()); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); final ShardsCollector collector = new ShardsCollector(Settings.EMPTY, clusterService, licenseState); - assertThat(collector.shouldCollect(), is(false)); - verify(licenseState).isMonitoringAllowed(); + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } } public void testShouldCollectReturnsFalseIfNotMaster() { @@ -60,9 +63,7 @@ public class ShardsCollectorTests extends BaseCollectorTestCase { final ShardsCollector collector = new ShardsCollector(Settings.EMPTY, clusterService, licenseState); - assertThat(collector.shouldCollect(), is(false)); - verify(licenseState).isMonitoringAllowed(); - verify(nodes).isLocalNodeElectedMaster(); + assertThat(collector.shouldCollect(false), is(false)); } public void testShouldCollectReturnsTrue() { @@ -71,20 +72,16 @@ public class ShardsCollectorTests extends BaseCollectorTestCase { final ShardsCollector collector = new ShardsCollector(Settings.EMPTY, clusterService, licenseState); - assertThat(collector.shouldCollect(), is(true)); + assertThat(collector.shouldCollect(true), is(true)); verify(licenseState).isMonitoringAllowed(); - verify(nodes).isLocalNodeElectedMaster(); } public void testDoCollectWhenNoClusterState() throws Exception { - when(clusterService.state()).thenReturn(null); - final ShardsCollector collector = new ShardsCollector(Settings.EMPTY, clusterService, licenseState); - final Collection results = collector.doCollect(randomMonitoringNode(random()), randomNonNegativeLong()); + final Collection results = collector.doCollect(randomMonitoringNode(random()), randomNonNegativeLong(), null); assertThat(results, notNullValue()); assertThat(results.size(), equalTo(0)); - verify(clusterService).state(); } public void testDoCollect() throws Exception { @@ -114,7 +111,10 @@ public class ShardsCollectorTests extends BaseCollectorTestCase { final long interval = randomNonNegativeLong(); - final Collection results = collector.doCollect(node, interval); + final Collection results = collector.doCollect(node, interval, clusterState); + verify(clusterState).metaData(); + verify(metaData).clusterUUID(); + assertThat(results, notNullValue()); assertThat(results.size(), equalTo((indices != NONE) ? routingTable.allShards().size() : 0));