[Monitoring] Use the same Cluster State for all Collectors (elastic/x-pack-elasticsearch#3216)

This commit changes the Collectors so that they all use the 
same instance of ClusterState.

relates elastic/x-pack-elasticsearch#3156

Original commit: elastic/x-pack-elasticsearch@4f537b026c
This commit is contained in:
Tanguy Leroux 2017-12-12 12:29:40 +01:00 committed by GitHub
parent 0d46e9035c
commit 711254fd24
17 changed files with 170 additions and 133 deletions

View File

@ -161,7 +161,7 @@ public class Monitoring implements ActionPlugin {
collectors.add(new IndexRecoveryCollector(settings, clusterService, licenseState, client));
collectors.add(new JobStatsCollector(settings, clusterService, licenseState, client));
final MonitoringService monitoringService = new MonitoringService(settings, clusterSettings, threadPool, collectors, exporters);
final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters);
return Arrays.asList(monitoringService, exporters, cleanerService);
}

View File

@ -8,6 +8,8 @@ package org.elasticsearch.xpack.monitoring;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -61,6 +63,7 @@ public class MonitoringService extends AbstractLifecycleComponent {
/** Task in charge of collecting and exporting monitoring data **/
private final MonitoringExecution monitor = new MonitoringExecution();
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final Set<Collector> collectors;
private final Exporters exporters;
@ -68,14 +71,15 @@ public class MonitoringService extends AbstractLifecycleComponent {
private volatile TimeValue interval;
private volatile ThreadPool.Cancellable scheduler;
MonitoringService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
MonitoringService(Settings settings, ClusterService clusterService, ThreadPool threadPool,
Set<Collector> collectors, Exporters exporters) {
super(settings);
this.clusterService = Objects.requireNonNull(clusterService);
this.threadPool = Objects.requireNonNull(threadPool);
this.collectors = Objects.requireNonNull(collectors);
this.exporters = Objects.requireNonNull(exporters);
this.interval = INTERVAL.get(settings);
clusterSettings.addSettingsUpdateConsumer(INTERVAL, this::setInterval);
clusterService.getClusterSettings().addSettingsUpdateConsumer(INTERVAL, this::setInterval);
}
void setInterval(TimeValue interval) {
@ -191,6 +195,8 @@ public class MonitoringService extends AbstractLifecycleComponent {
@Override
protected void doRun() throws Exception {
final long timestamp = System.currentTimeMillis();
final long intervalInMillis = interval.getMillis();
final ClusterState clusterState = clusterService.state();
final Collection<MonitoringDoc> results = new ArrayList<>();
for (Collector collector : collectors) {
@ -201,7 +207,7 @@ public class MonitoringService extends AbstractLifecycleComponent {
}
try {
Collection<MonitoringDoc> result = collector.collect(timestamp, interval.getMillis());
Collection<MonitoringDoc> result = collector.collect(timestamp, intervalInMillis, clusterState);
if (result != null) {
results.addAll(result);
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.monitoring.collector;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -68,8 +69,10 @@ public abstract class Collector extends AbstractComponent {
/**
* Indicates if the current collector is allowed to collect data
*
* @param isElectedMaster true if the current local node is the elected master node
*/
protected boolean shouldCollect() {
protected boolean shouldCollect(final boolean isElectedMaster) {
if (licenseState.isMonitoringAllowed() == false) {
logger.trace("collector [{}] can not collect data due to invalid license", name());
return false;
@ -77,15 +80,12 @@ public abstract class Collector extends AbstractComponent {
return true;
}
protected boolean isLocalNodeMaster() {
return clusterService.state().nodes().isLocalNodeElectedMaster();
}
public Collection<MonitoringDoc> collect(final long timestamp, final long interval) {
public Collection<MonitoringDoc> collect(final long timestamp, final long interval, final ClusterState clusterState) {
try {
if (shouldCollect()) {
final boolean isElectedMaster = clusterState.getNodes().isLocalNodeElectedMaster();
if (shouldCollect(isElectedMaster)) {
logger.trace("collector [{}] - collecting data...", name());
return doCollect(convertNode(timestamp, clusterService.localNode()), interval);
return doCollect(convertNode(timestamp, clusterService.localNode()), interval, clusterState);
}
} catch (ElasticsearchTimeoutException e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("collector [{}] timed out when collecting data", name()));
@ -95,11 +95,9 @@ public abstract class Collector extends AbstractComponent {
return null;
}
protected abstract Collection<MonitoringDoc> doCollect(MonitoringDoc.Node sourceNode, long interval) throws Exception;
protected String clusterUUID() {
return clusterService.state().metaData().clusterUUID();
}
protected abstract Collection<MonitoringDoc> doCollect(MonitoringDoc.Node node,
long interval,
ClusterState clusterState) throws Exception;
/**
* Returns a timestamp to use in {@link MonitoringDoc}
@ -110,6 +108,16 @@ public abstract class Collector extends AbstractComponent {
return System.currentTimeMillis();
}
/**
* Extracts the current cluster's UUID from a {@link ClusterState}
*
* @param clusterState the {@link ClusterState}
* @return the cluster's UUID
*/
protected static String clusterUuid(final ClusterState clusterState) {
return clusterState.metaData().clusterUUID();
}
/**
* Returns the value of the collection timeout configured for the current {@link Collector}.
*

View File

@ -81,13 +81,15 @@ public class ClusterStatsCollector extends Collector {
}
@Override
protected boolean shouldCollect() {
protected boolean shouldCollect(final boolean isElectedMaster) {
// This collector can always collect data on the master node
return isLocalNodeMaster();
return isElectedMaster;
}
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node, final long interval) throws Exception {
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final Supplier<ClusterStatsResponse> clusterStatsSupplier =
() -> client.admin().cluster().prepareClusterStats().get(getCollectionTimeout());
final Supplier<List<XPackFeatureSet.Usage>> usageSupplier =
@ -96,8 +98,8 @@ public class ClusterStatsCollector extends Collector {
final ClusterStatsResponse clusterStats = clusterStatsSupplier.get();
final String clusterName = clusterService.getClusterName().value();
final String clusterUuid = clusterUuid(clusterState);
final String version = Version.CURRENT.toString();
final ClusterState clusterState = clusterService.state();
final License license = licenseService.getLicense();
final List<XPackFeatureSet.Usage> xpackUsage = collect(usageSupplier);
final boolean apmIndicesExist = doAPMIndicesExist(clusterState);
@ -108,7 +110,7 @@ public class ClusterStatsCollector extends Collector {
// Adds a cluster stats document
return Collections.singleton(
new ClusterStatsMonitoringDoc(clusterUUID(), timestamp(), interval, node, clusterName, version, clusterStats.getStatus(),
new ClusterStatsMonitoringDoc(clusterUuid, timestamp(), interval, node, clusterName, version, clusterStats.getStatus(),
license, apmIndicesExist, xpackUsage, clusterStats, clusterState, clusterNeedsTLSEnabled));
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.monitoring.collector.indices;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -59,12 +60,14 @@ public class IndexRecoveryCollector extends Collector {
}
@Override
protected boolean shouldCollect() {
return super.shouldCollect() && isLocalNodeMaster();
protected boolean shouldCollect(final boolean isElectedMaster) {
return isElectedMaster && super.shouldCollect(isElectedMaster);
}
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node, final long interval) throws Exception {
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
List<MonitoringDoc> results = new ArrayList<>(1);
RecoveryResponse recoveryResponse = client.admin().indices().prepareRecoveries()
.setIndices(getCollectionIndices())
@ -73,7 +76,8 @@ public class IndexRecoveryCollector extends Collector {
.get(getCollectionTimeout());
if (recoveryResponse.hasRecoveries()) {
results.add(new IndexRecoveryMonitoringDoc(clusterUUID(), timestamp(), interval, node, recoveryResponse));
final String clusterUuid = clusterUuid(clusterState);
results.add(new IndexRecoveryMonitoringDoc(clusterUuid, timestamp(), interval, node, recoveryResponse));
}
return Collections.unmodifiableCollection(results);
}

View File

@ -49,12 +49,14 @@ public class IndexStatsCollector extends Collector {
}
@Override
protected boolean shouldCollect() {
return super.shouldCollect() && isLocalNodeMaster();
protected boolean shouldCollect(final boolean isElectedMaster) {
return isElectedMaster && super.shouldCollect(isElectedMaster);
}
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node, final long interval) throws Exception {
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final List<MonitoringDoc> results = new ArrayList<>();
final IndicesStatsResponse indicesStats = client.admin().indices().prepareStats()
.setIndices(getCollectionIndices())
@ -73,8 +75,7 @@ public class IndexStatsCollector extends Collector {
.get(getCollectionTimeout());
final long timestamp = timestamp();
final String clusterUuid = clusterUUID();
final ClusterState clusterState = clusterService.state();
final String clusterUuid = clusterUuid(clusterState);
// add the indices stats that we use to collect the index stats
results.add(new IndicesStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indicesStats));

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.monitoring.collector.ml;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
@ -57,15 +58,18 @@ public class JobStatsCollector extends Collector {
}
@Override
protected boolean shouldCollect() {
protected boolean shouldCollect(final boolean isElectedMaster) {
// This can only run when monitoring is allowed + ML is enabled/allowed, but also only on the elected master node
return super.shouldCollect() &&
XPackSettings.MACHINE_LEARNING_ENABLED.get(settings) && licenseState.isMachineLearningAllowed() &&
isLocalNodeMaster();
return isElectedMaster
&& super.shouldCollect(isElectedMaster)
&& XPackSettings.MACHINE_LEARNING_ENABLED.get(settings)
&& licenseState.isMachineLearningAllowed();
}
@Override
protected List<MonitoringDoc> doCollect(final MonitoringDoc.Node node, final long interval) throws Exception {
protected List<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
// fetch details about all jobs
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, MONITORING_ORIGIN)) {
final GetJobsStatsAction.Response jobs =
@ -73,7 +77,7 @@ public class JobStatsCollector extends Collector {
.actionGet(getCollectionTimeout());
final long timestamp = timestamp();
final String clusterUuid = clusterUUID();
final String clusterUuid = clusterUuid(clusterState);
return jobs.getResponse().results().stream()
.map(jobStats -> new JobStatsMonitoringDoc(clusterUuid, timestamp, interval, node, jobStats))

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.bootstrap.BootstrapInfo;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -59,12 +60,14 @@ public class NodeStatsCollector extends Collector {
// For testing purpose
@Override
protected boolean shouldCollect() {
return super.shouldCollect();
protected boolean shouldCollect(final boolean isElectedMaster) {
return super.shouldCollect(isElectedMaster);
}
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node, final long interval) throws Exception {
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
NodesStatsRequest request = new NodesStatsRequest("_local");
request.indices(FLAGS);
request.os(true);
@ -81,10 +84,11 @@ public class NodeStatsCollector extends Collector {
throw response.failures().get(0);
}
final String clusterUuid = clusterUuid(clusterState);
final NodeStats nodeStats = response.getNodes().get(0);
return Collections.singletonList(new NodeStatsMonitoringDoc(clusterUUID(), nodeStats.getTimestamp(), interval, node,
node.getUUID(), isLocalNodeMaster(), nodeStats, BootstrapInfo.isMemoryLocked()));
return Collections.singletonList(new NodeStatsMonitoringDoc(clusterUuid, nodeStats.getTimestamp(), interval, node,
node.getUUID(), clusterState.getNodes().isLocalNodeElectedMaster(), nodeStats, BootstrapInfo.isMemoryLocked()));
}
}

View File

@ -38,21 +38,21 @@ public class ShardsCollector extends Collector {
}
@Override
protected boolean shouldCollect() {
return super.shouldCollect() && isLocalNodeMaster();
protected boolean shouldCollect(final boolean isElectedMaster) {
return isElectedMaster && super.shouldCollect(isElectedMaster);
}
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node, final long interval) throws Exception {
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final List<MonitoringDoc> results = new ArrayList<>(1);
final ClusterState clusterState = clusterService.state();
if (clusterState != null) {
RoutingTable routingTable = clusterState.routingTable();
if (routingTable != null) {
List<ShardRouting> shards = routingTable.allShards();
if (shards != null) {
final String clusterUUID = clusterUUID();
final String clusterUuid = clusterUuid(clusterState);
final String stateUUID = clusterState.stateUUID();
final long timestamp = timestamp();
@ -66,7 +66,7 @@ public class ShardsCollector extends Collector {
// If the shard is assigned to a node, the shard monitoring document refers to this node
shardNode = convertNode(node.getTimestamp(), clusterState.getNodes().get(shard.currentNodeId()));
}
results.add(new ShardMonitoringDoc(clusterUUID, timestamp, interval, shardNode, shard, stateUUID));
results.add(new ShardMonitoringDoc(clusterUuid, timestamp, interval, shardNode, shard, stateUUID));
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -48,6 +49,7 @@ public class MonitoringServiceTests extends ESTestCase {
final Monitoring monitoring = new Monitoring(Settings.EMPTY, licenseState);
clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(monitoring.getSettings()));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
when(clusterService.state()).thenReturn(mock(ClusterState.class));
}
@After
@ -59,7 +61,7 @@ public class MonitoringServiceTests extends ESTestCase {
}
public void testIsMonitoringActive() throws Exception {
monitoringService = new MonitoringService(Settings.EMPTY, clusterSettings, threadPool, emptySet(), new CountingExporter());
monitoringService = new MonitoringService(Settings.EMPTY, clusterService, threadPool, emptySet(), new CountingExporter());
monitoringService.start();
assertBusy(() -> assertTrue(monitoringService.isStarted()));
@ -82,7 +84,7 @@ public class MonitoringServiceTests extends ESTestCase {
Settings settings = Settings.builder().put(MonitoringService.INTERVAL.getKey(), TimeValue.MINUS_ONE).build();
CountingExporter exporter = new CountingExporter();
monitoringService = new MonitoringService(settings, clusterSettings, threadPool, emptySet(), exporter);
monitoringService = new MonitoringService(settings, clusterService, threadPool, emptySet(), exporter);
monitoringService.start();
assertBusy(() -> assertTrue(monitoringService.isStarted()));
@ -105,7 +107,7 @@ public class MonitoringServiceTests extends ESTestCase {
final BlockingExporter exporter = new BlockingExporter(latch);
Settings settings = Settings.builder().put(MonitoringService.INTERVAL.getKey(), MonitoringService.MIN_INTERVAL).build();
monitoringService = new MonitoringService(settings, clusterSettings, threadPool, emptySet(), exporter);
monitoringService = new MonitoringService(settings, clusterService, threadPool, emptySet(), exporter);
monitoringService.start();
assertBusy(() -> assertTrue(monitoringService.isStarted()));

View File

@ -59,7 +59,7 @@ public abstract class BaseCollectorTestCase extends ESTestCase {
protected void whenLocalNodeElectedMaster(final boolean electedMaster) {
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.nodes()).thenReturn(nodes);
when(clusterState.getNodes()).thenReturn(nodes);
when(nodes.isLocalNodeElectedMaster()).thenReturn(electedMaster);
}

View File

@ -66,24 +66,17 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase {
}
public void testShouldCollectReturnsFalseIfNotMaster() {
// this controls the blockage
whenLocalNodeElectedMaster(false);
final ClusterStatsCollector collector =
new ClusterStatsCollector(Settings.EMPTY, clusterService, licenseState, client, licenseService);
assertThat(collector.shouldCollect(), is(false));
verify(nodes).isLocalNodeElectedMaster();
assertThat(collector.shouldCollect(false), is(false));
}
public void testShouldCollectReturnsTrue() {
whenLocalNodeElectedMaster(true);
final ClusterStatsCollector collector =
new ClusterStatsCollector(Settings.EMPTY, clusterService, licenseState, client, licenseService);
assertThat(collector.shouldCollect(), is(true));
verify(nodes).isLocalNodeElectedMaster();
assertThat(collector.shouldCollect(true), is(true));
}
public void testDoAPMIndicesExistReturnsBasedOnIndices() {
@ -219,7 +212,7 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase {
final long interval = randomNonNegativeLong();
final Collection<MonitoringDoc> results = collector.doCollect(node, interval);
final Collection<MonitoringDoc> results = collector.doCollect(node, interval, clusterState);
assertEquals(1, results.size());
final MonitoringDoc monitoringDoc = results.iterator().next();
@ -254,7 +247,8 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase {
assertThat(document.getClusterState().stateUUID(), equalTo(clusterState.stateUUID()));
verify(clusterService, times(1)).getClusterName();
verify(clusterService, times(2)).state();
verify(clusterState, times(1)).metaData();
verify(metaData, times(1)).clusterUUID();
verify(licenseService, times(1)).getLicense();
verify(clusterAdminClient).prepareClusterStats();
verify(client).execute(same(XPackUsageAction.INSTANCE), any(XPackUsageRequest.class));

View File

@ -44,6 +44,7 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -52,35 +53,30 @@ public class IndexRecoveryCollectorTests extends BaseCollectorTestCase {
public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
// this controls the blockage
when(licenseState.isMonitoringAllowed()).thenReturn(false);
whenLocalNodeElectedMaster(randomBoolean());
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final IndexRecoveryCollector collector = new IndexRecoveryCollector(Settings.EMPTY, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(false));
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfNotMaster() {
when(licenseState.isMonitoringAllowed()).thenReturn(true);
// this controls the blockage
whenLocalNodeElectedMaster(false);
final IndexRecoveryCollector collector = new IndexRecoveryCollector(Settings.EMPTY, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(false));
verify(licenseState).isMonitoringAllowed();
verify(nodes).isLocalNodeElectedMaster();
assertThat(collector.shouldCollect(false), is(false));
}
public void testShouldCollectReturnsTrue() {
when(licenseState.isMonitoringAllowed()).thenReturn(true);
whenLocalNodeElectedMaster(true);
final IndexRecoveryCollector collector = new IndexRecoveryCollector(Settings.EMPTY, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(true));
assertThat(collector.shouldCollect(true), is(true));
verify(licenseState).isMonitoringAllowed();
verify(nodes).isLocalNodeElectedMaster();
}
public void testDoCollect() throws Exception {
@ -157,8 +153,12 @@ public class IndexRecoveryCollectorTests extends BaseCollectorTestCase {
final long interval = randomNonNegativeLong();
final Collection<MonitoringDoc> results = collector.doCollect(node, interval);
final Collection<MonitoringDoc> results = collector.doCollect(node, interval, clusterState);
verify(indicesAdminClient).prepareRecoveries();
if (recoveryStates.isEmpty() == false) {
verify(clusterState).metaData();
verify(metaData).clusterUUID();
}
if (nbRecoveries == 0) {
assertEquals(0, results.size());

View File

@ -36,6 +36,7 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -44,35 +45,30 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase {
public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
// this controls the blockage
when(licenseState.isMonitoringAllowed()).thenReturn(false);
whenLocalNodeElectedMaster(randomBoolean());
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final IndexStatsCollector collector = new IndexStatsCollector(Settings.EMPTY, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(false));
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfNotMaster() {
when(licenseState.isMonitoringAllowed()).thenReturn(true);
// this controls the blockage
whenLocalNodeElectedMaster(false);
final IndexStatsCollector collector = new IndexStatsCollector(Settings.EMPTY, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(false));
verify(licenseState).isMonitoringAllowed();
verify(nodes).isLocalNodeElectedMaster();
assertThat(collector.shouldCollect(false), is(false));
}
public void testShouldCollectReturnsTrue() {
when(licenseState.isMonitoringAllowed()).thenReturn(true);
whenLocalNodeElectedMaster(true);
final IndexStatsCollector collector = new IndexStatsCollector(Settings.EMPTY, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(true));
assertThat(collector.shouldCollect(true), is(true));
verify(licenseState).isMonitoringAllowed();
verify(nodes).isLocalNodeElectedMaster();
}
public void testDoCollect() throws Exception {
@ -133,8 +129,11 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase {
final long interval = randomNonNegativeLong();
final Collection<MonitoringDoc> results = collector.doCollect(node, interval);
final Collection<MonitoringDoc> results = collector.doCollect(node, interval, clusterState);
verify(indicesAdminClient).prepareStats();
verify(clusterState, times(1 + indices)).metaData();
verify(clusterState, times(indices)).routingTable();
verify(metaData).clusterUUID();
assertEquals(1 + indices, results.size());

View File

@ -43,6 +43,8 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase {
public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
final Settings settings = randomFrom(mlEnabledSettings(), mlDisabledSettings());
final boolean mlAllowed = randomBoolean();
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
// this controls the blockage
when(licenseState.isMonitoringAllowed()).thenReturn(false);
@ -50,10 +52,11 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase {
final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(false));
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfNotMaster() {
// regardless of ML being enabled
@ -62,13 +65,11 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase {
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
when(licenseState.isMachineLearningAllowed()).thenReturn(randomBoolean());
// this controls the blockage
whenLocalNodeElectedMaster(false);
final boolean isElectedMaster = false;
final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(false));
verify(licenseState).isMonitoringAllowed();
assertThat(collector.shouldCollect(isElectedMaster), is(false));
}
public void testShouldCollectReturnsFalseIfMLIsDisabled() {
@ -77,14 +78,18 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase {
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
when(licenseState.isMachineLearningAllowed()).thenReturn(randomBoolean());
whenLocalNodeElectedMaster(randomBoolean());
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(false));
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfMLIsNotAllowed() {
final Settings settings = randomFrom(mlEnabledSettings(), mlDisabledSettings());
@ -92,32 +97,36 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase {
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
// this is controls the blockage
when(licenseState.isMachineLearningAllowed()).thenReturn(false);
whenLocalNodeElectedMaster(randomBoolean());
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(false));
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsTrue() {
final Settings settings = mlEnabledSettings();
when(licenseState.isMonitoringAllowed()).thenReturn(true);
when(licenseState.isMachineLearningAllowed()).thenReturn(true);
whenLocalNodeElectedMaster(true);
final boolean isElectedMaster = true;
final JobStatsCollector collector = new JobStatsCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(true));
assertThat(collector.shouldCollect(isElectedMaster), is(true));
verify(licenseState).isMonitoringAllowed();
}
public void testDoCollect() throws Exception {
final MetaData metaData = mock(MetaData.class);
final String clusterUuid = randomAlphaOfLength(5);
whenClusterStateWithUUID(clusterUuid);
final MonitoringDoc.Node node = randomMonitoringNode(random());
final MachineLearningClient client = mock(MachineLearningClient.class);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
@ -125,10 +134,6 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase {
final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
withCollectionTimeout(JobStatsCollector.JOB_STATS_TIMEOUT, timeout);
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.metaData()).thenReturn(metaData);
when(metaData.clusterUUID()).thenReturn(clusterUuid);
final JobStatsCollector collector = new JobStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext);
assertEquals(timeout, collector.getCollectionTimeout());
@ -143,7 +148,9 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase {
final long interval = randomNonNegativeLong();
final List<MonitoringDoc> monitoringDocs = collector.doCollect(node, interval);
final List<MonitoringDoc> monitoringDocs = collector.doCollect(node, interval, clusterState);
verify(clusterState).metaData();
verify(metaData).clusterUUID();
assertThat(monitoringDocs, hasSize(jobStats.size()));

View File

@ -40,21 +40,24 @@ public class NodeStatsCollectorTests extends BaseCollectorTestCase {
public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
// this controls the blockage
when(licenseState.isMonitoringAllowed()).thenReturn(false);
whenLocalNodeElectedMaster(randomBoolean());
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final NodeStatsCollector collector = new NodeStatsCollector(Settings.EMPTY, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(false));
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsTrue() {
when(licenseState.isMonitoringAllowed()).thenReturn(true);
whenLocalNodeElectedMaster(true);
final boolean isElectedMaster = true;
final NodeStatsCollector collector = new NodeStatsCollector(Settings.EMPTY, clusterService, licenseState, client);
assertThat(collector.shouldCollect(), is(true));
assertThat(collector.shouldCollect(isElectedMaster), is(true));
verify(licenseState).isMonitoringAllowed();
}
@ -77,7 +80,7 @@ public class NodeStatsCollectorTests extends BaseCollectorTestCase {
assertEquals(timeout, collector.getCollectionTimeout());
final FailedNodeException e = expectThrows(FailedNodeException.class, () ->
collector.doCollect(randomMonitoringNode(random()), randomNonNegativeLong()));
collector.doCollect(randomMonitoringNode(random()), randomNonNegativeLong(), clusterState));
assertEquals(exception, e);
}
@ -112,7 +115,10 @@ public class NodeStatsCollectorTests extends BaseCollectorTestCase {
final long interval = randomNonNegativeLong();
final Collection<MonitoringDoc> results = collector.doCollect(node, interval);
final Collection<MonitoringDoc> results = collector.doCollect(node, interval, clusterState);
verify(clusterState).metaData();
verify(metaData).clusterUUID();
assertEquals(1, results.size());
final MonitoringDoc monitoringDoc = results.iterator().next();

View File

@ -45,13 +45,16 @@ public class ShardsCollectorTests extends BaseCollectorTestCase {
public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
// this controls the blockage
when(licenseState.isMonitoringAllowed()).thenReturn(false);
whenLocalNodeElectedMaster(randomBoolean());
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final ShardsCollector collector = new ShardsCollector(Settings.EMPTY, clusterService, licenseState);
assertThat(collector.shouldCollect(), is(false));
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfNotMaster() {
when(licenseState.isMonitoringAllowed()).thenReturn(true);
@ -60,9 +63,7 @@ public class ShardsCollectorTests extends BaseCollectorTestCase {
final ShardsCollector collector = new ShardsCollector(Settings.EMPTY, clusterService, licenseState);
assertThat(collector.shouldCollect(), is(false));
verify(licenseState).isMonitoringAllowed();
verify(nodes).isLocalNodeElectedMaster();
assertThat(collector.shouldCollect(false), is(false));
}
public void testShouldCollectReturnsTrue() {
@ -71,20 +72,16 @@ public class ShardsCollectorTests extends BaseCollectorTestCase {
final ShardsCollector collector = new ShardsCollector(Settings.EMPTY, clusterService, licenseState);
assertThat(collector.shouldCollect(), is(true));
assertThat(collector.shouldCollect(true), is(true));
verify(licenseState).isMonitoringAllowed();
verify(nodes).isLocalNodeElectedMaster();
}
public void testDoCollectWhenNoClusterState() throws Exception {
when(clusterService.state()).thenReturn(null);
final ShardsCollector collector = new ShardsCollector(Settings.EMPTY, clusterService, licenseState);
final Collection<MonitoringDoc> results = collector.doCollect(randomMonitoringNode(random()), randomNonNegativeLong());
final Collection<MonitoringDoc> results = collector.doCollect(randomMonitoringNode(random()), randomNonNegativeLong(), null);
assertThat(results, notNullValue());
assertThat(results.size(), equalTo(0));
verify(clusterService).state();
}
public void testDoCollect() throws Exception {
@ -114,7 +111,10 @@ public class ShardsCollectorTests extends BaseCollectorTestCase {
final long interval = randomNonNegativeLong();
final Collection<MonitoringDoc> results = collector.doCollect(node, interval);
final Collection<MonitoringDoc> results = collector.doCollect(node, interval, clusterState);
verify(clusterState).metaData();
verify(metaData).clusterUUID();
assertThat(results, notNullValue());
assertThat(results.size(), equalTo((indices != NONE) ? routingTable.allShards().size() : 0));