[Monitoring] Schedule data collection instead of sleeping (elastic/elasticsearch#4266)

This commit renames the current AgentService into MonitoringService and changes the way it works: it was previously based on thread sleeping and it now use thread scheduling instead. 

At every given time interval, a MonitoringExecution is executed. It first checks if monitoring data can be collected and if so it will collect data from Collectors and then export the data using the Exporters. There are cases where the data cannot be collected: when the service is stopping, when the interval has been set to -1 after the MonitoringExecution has been scheduled, or when the previous data collection is not yet terminated. In this last case, MonitoringExecution will still be executed at the given interval but will not collect any data.

All tasks are executed on the generic thread pool.

closes elastic/elasticsearch#2866

Original commit: elastic/x-pack-elasticsearch@d37b4d3731
This commit is contained in:
Tanguy Leroux 2017-01-05 17:45:17 +01:00 committed by GitHub
parent 3d2d1d49b6
commit b72dd8a2d1
20 changed files with 487 additions and 284 deletions

View File

@ -239,3 +239,9 @@ thirdPartyAudit.excludes = [
'javax.activation.UnsupportedDataTypeException' 'javax.activation.UnsupportedDataTypeException'
] ]
run {
setting 'xpack.graph.enabled', 'true'
setting 'xpack.security.enabled', 'true'
setting 'xpack.monitoring.enabled', 'true'
setting 'xpack.watcher.enabled', 'true'
}

View File

@ -1,230 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.exporter.ExportException;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
/**
* The {@code AgentService} is a service that does the work of publishing the details to the monitoring cluster.
* <p>
* If this service is stopped, then the attached, monitored node is not going to publish its details to the monitoring cluster. Given
* service life cycles, the intended way to temporarily stop the publishing is using the start and stop collection methods.
*
* @see #stopCollection()
* @see #startCollection()
*/
public class AgentService extends AbstractLifecycleComponent {
private volatile ExportingWorker exportingWorker;
private volatile Thread workerThread;
private volatile long samplingIntervalMillis;
private final Collection<Collector> collectors;
private final String[] settingsCollectors;
private final Exporters exporters;
public AgentService(Settings settings, ClusterSettings clusterSettings, Set<Collector> collectors, Exporters exporters) {
super(settings);
this.samplingIntervalMillis = MonitoringSettings.INTERVAL.get(settings).millis();
this.settingsCollectors = MonitoringSettings.COLLECTORS.get(settings).toArray(new String[0]);
this.collectors = Collections.unmodifiableSet(filterCollectors(collectors, settingsCollectors));
this.exporters = exporters;
clusterSettings.addSettingsUpdateConsumer(MonitoringSettings.INTERVAL, this::setInterval);
}
private void setInterval(TimeValue interval) {
this.samplingIntervalMillis = interval.millis();
applyIntervalSettings();
}
protected Set<Collector> filterCollectors(Set<Collector> collectors, String[] filters) {
if (CollectionUtils.isEmpty(filters)) {
return collectors;
}
Set<Collector> list = new HashSet<>();
for (Collector collector : collectors) {
if (Regex.simpleMatch(filters, collector.name().toLowerCase(Locale.ROOT))) {
list.add(collector);
} else if (collector instanceof ClusterStatsCollector) {
list.add(collector);
}
}
return list;
}
protected void applyIntervalSettings() {
if (samplingIntervalMillis <= 0) {
logger.info("data sampling is disabled due to interval settings [{}]", samplingIntervalMillis);
if (workerThread != null) {
// notify worker to stop on its leisure, not to disturb an exporting operation
exportingWorker.closed = true;
exportingWorker = null;
workerThread = null;
}
} else if (workerThread == null || !workerThread.isAlive()) {
exportingWorker = new ExportingWorker();
workerThread = new Thread(exportingWorker, EsExecutors.threadName(settings, "monitoring.exporters"));
workerThread.setDaemon(true);
workerThread.start();
}
}
/** stop collection and exporting. this method blocks until all background activity is guaranteed to be stopped */
public void stopCollection() {
final ExportingWorker worker = this.exportingWorker;
if (worker != null) {
worker.stopCollecting();
}
}
public void startCollection() {
final ExportingWorker worker = this.exportingWorker;
if (worker != null) {
worker.collecting = true;
}
}
@Override
protected void doStart() {
logger.debug("monitoring service started");
exporters.start();
applyIntervalSettings();
}
@Override
protected void doStop() {
if (workerThread != null && workerThread.isAlive()) {
exportingWorker.closed = true;
workerThread.interrupt();
try {
workerThread.join(60000);
} catch (InterruptedException e) {
// we don't care...
}
}
exporters.stop();
}
@Override
protected void doClose() {
for (Exporter exporter : exporters) {
try {
exporter.close();
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to close exporter [{}]", exporter.name()), e);
}
}
}
public TimeValue getSamplingInterval() {
return TimeValue.timeValueMillis(samplingIntervalMillis);
}
public String[] collectors() {
return settingsCollectors;
}
class ExportingWorker implements Runnable {
volatile boolean closed = false;
volatile boolean collecting = true;
final ReleasableLock collectionLock = new ReleasableLock(new ReentrantLock(false));
@Override
public void run() {
while (!closed) {
// sleep first to allow node to complete initialization before collecting the first start
try {
Thread.sleep(samplingIntervalMillis);
if (closed) {
continue;
}
try (Releasable ignore = collectionLock.acquire()) {
Collection<MonitoringDoc> docs = collect();
if ((docs.isEmpty() == false) && (closed == false)) {
exporters.export(docs);
}
}
} catch (ExportException e) {
logger.error("exception when exporting documents", e);
} catch (InterruptedException e) {
logger.trace("interrupted");
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("background thread had an uncaught exception", e);
}
}
logger.debug("worker shutdown");
}
/** stop collection and exporting. this method will be block until background collection is actually stopped */
public void stopCollecting() {
collecting = false;
collectionLock.acquire().close();
}
private Collection<MonitoringDoc> collect() {
if (logger.isTraceEnabled()) {
logger.trace("collecting data - collectors [{}]", Strings.collectionToCommaDelimitedString(collectors));
}
Collection<MonitoringDoc> docs = new ArrayList<>();
for (Collector collector : collectors) {
if (collecting) {
Collection<MonitoringDoc> result = collector.collect();
if (result != null) {
logger.trace("adding [{}] collected docs from [{}] collector", result.size(), collector.name());
docs.addAll(result);
} else {
logger.trace("skipping collected docs from [{}] collector", collector.name());
}
}
if (closed) {
// Stop collecting if the worker is marked as closed
break;
}
}
return docs;
}
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.ssl.SSLService; import org.elasticsearch.xpack.ssl.SSLService;
import java.time.Clock;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -119,9 +120,10 @@ public class Monitoring implements ActionPlugin {
collectors.add(new ShardsCollector(settings, clusterService, monitoringSettings, licenseState)); collectors.add(new ShardsCollector(settings, clusterService, monitoringSettings, licenseState));
collectors.add(new NodeStatsCollector(settings, clusterService, monitoringSettings, licenseState, client)); collectors.add(new NodeStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new IndexRecoveryCollector(settings, clusterService, monitoringSettings, licenseState, client)); collectors.add(new IndexRecoveryCollector(settings, clusterService, monitoringSettings, licenseState, client));
final AgentService agentService = new AgentService(settings, clusterSettings, collectors, exporters); final MonitoringService monitoringService =
new MonitoringService(settings, clusterSettings, threadPool, collectors, exporters);
return Arrays.asList(agentService, monitoringSettings, exporters, cleanerService); return Arrays.asList(monitoringService, monitoringSettings, exporters, cleanerService);
} }
@Override @Override

View File

@ -0,0 +1,236 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The {@code MonitoringService} is a service that does the work of publishing the details to the monitoring cluster.
* <p>
* If this service is stopped, then the attached, monitored node is not going to publish its details to the monitoring cluster. Given
* service life cycles, the intended way to temporarily stop the publishing is using the start and stop methods.
*/
public class MonitoringService extends AbstractLifecycleComponent {
/** State of the monitoring service, either started or stopped **/
private final AtomicBoolean started = new AtomicBoolean(false);
/** Task in charge of collecting and exporting monitoring data **/
private final MonitoringExecution monitor = new MonitoringExecution();
private final ThreadPool threadPool;
private final Set<Collector> collectors;
private final Exporters exporters;
private volatile TimeValue interval;
private volatile ThreadPool.Cancellable scheduler;
MonitoringService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Set<Collector> collectors, Exporters exporters) {
super(settings);
this.threadPool = Objects.requireNonNull(threadPool);
this.collectors = Objects.requireNonNull(collectors);
this.exporters = Objects.requireNonNull(exporters);
this.interval = MonitoringSettings.INTERVAL.get(settings);
clusterSettings.addSettingsUpdateConsumer(MonitoringSettings.INTERVAL, this::setInterval);
}
void setInterval(TimeValue interval) {
this.interval = interval;
scheduleExecution();
}
public TimeValue getInterval() {
return interval;
}
boolean isMonitoringActive() {
return isStarted()
&& interval != null
&& interval.millis() >= MonitoringSettings.MIN_INTERVAL.millis();
}
private String threadPoolName() {
return ThreadPool.Names.GENERIC;
}
boolean isStarted() {
return started.get();
}
@Override
protected void doStart() {
if (started.compareAndSet(false, true)) {
try {
logger.debug("monitoring service is starting");
scheduleExecution();
logger.debug("monitoring service started");
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to start monitoring service"), e);
started.set(false);
throw e;
}
}
}
@Override
protected void doStop() {
if (started.getAndSet(false)) {
logger.debug("monitoring service is stopping");
cancelExecution();
logger.debug("monitoring service stopped");
}
}
@Override
protected void doClose() {
logger.debug("monitoring service is closing");
closeExecution();
for (Exporter exporter : exporters) {
try {
exporter.close();
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to close exporter [{}]", exporter.name()), e);
}
}
logger.debug("monitoring service closed");
}
void scheduleExecution() {
if (scheduler != null) {
cancelExecution();
}
if (isMonitoringActive()) {
scheduler = threadPool.scheduleWithFixedDelay(monitor, interval, threadPoolName());
}
}
void cancelExecution() {
if (scheduler != null) {
try {
scheduler.cancel();
} finally {
scheduler = null;
}
}
}
void closeExecution() {
try {
monitor.close();
} catch (IOException e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to close monitoring execution"), e);
}
}
/**
* {@link MonitoringExecution} is a scheduled {@link Runnable} that periodically checks if monitoring
* data can be collected and exported. It runs at a given interval corresponding to the monitoring
* sampling interval. It first checks if monitoring is still enabled (because it might have changed
* since the last time the task was scheduled: interval set to -1 or the monitoring service is stopped).
* Since collecting and exporting data can take time, it uses a semaphore to track the current execution.
*/
class MonitoringExecution extends AbstractRunnable implements Closeable {
/**
* Binary semaphore used to wait for monitoring execution to terminate before closing or stopping
* the monitoring service. A semaphore is preferred over a ReentrantLock because the lock is
* obtained by a thread and released by another thread.
**/
private final Semaphore semaphore = new Semaphore(1);
@Override
public void doRun() {
if (isMonitoringActive() == false) {
logger.debug("monitoring execution is skipped");
return;
}
if (semaphore.tryAcquire() == false) {
logger.debug("monitoring execution is skipped until previous execution terminated");
return;
}
threadPool.executor(threadPoolName()).submit(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
Collection<MonitoringDoc> results = new ArrayList<>();
for (Collector collector : collectors) {
if (isStarted() == false) {
// Do not collect more data if the the monitoring service is stopping
// otherwise some collectors might just fail.
return;
}
try {
Collection<MonitoringDoc> result = collector.collect();
if (result != null) {
results.addAll(result);
}
} catch (Exception e) {
logger.warn((Supplier<?>) () ->
new ParameterizedMessage("monitoring collector [{}] failed to collect data", collector.name()), e);
}
}
if (isMonitoringActive()) {
exporters.export(results);
}
}
@Override
public void onFailure(Exception e) {
logger.warn("monitoring execution failed", e);
}
@Override
public void onRejection(Exception e) {
logger.warn("monitoring execution has been rejected", e);
}
@Override
public void onAfter() {
semaphore.release();
}
});
}
@Override
public void onFailure(Exception e) {
logger.warn("monitoring execution failed", e);
}
@Override
public void close() throws IOException {
try {
// Block until the lock can be acquired
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.xpack.monitoring; package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
@ -34,11 +33,23 @@ public class MonitoringSettings extends AbstractComponent {
*/ */
public static final TimeValue HISTORY_DURATION_MINIMUM = TimeValue.timeValueHours(24); public static final TimeValue HISTORY_DURATION_MINIMUM = TimeValue.timeValueHours(24);
/**
* Minimum value for sampling interval (1 second)
*/
static final TimeValue MIN_INTERVAL = TimeValue.timeValueSeconds(1L);
/** /**
* Sampling interval between two collections (default to 10s) * Sampling interval between two collections (default to 10s)
*/ */
public static final Setting<TimeValue> INTERVAL = public static final Setting<TimeValue> INTERVAL = new Setting<>(collectionKey("interval"), "10s",
timeSetting(collectionKey("interval"), TimeValue.timeValueSeconds(10), Property.Dynamic, Property.NodeScope); (s) -> {
TimeValue value = TimeValue.parseTimeValue(s, null, collectionKey("interval"));
if (TimeValue.MINUS_ONE.equals(value) || value.millis() >= MIN_INTERVAL.millis()) {
return value;
}
throw new IllegalArgumentException("Failed to parse monitoring interval [" + s + "], value must be >= " + MIN_INTERVAL);
},
Property.Dynamic, Property.NodeScope);
/** /**
* Timeout value when collecting index statistics (default to 10m) * Timeout value when collecting index statistics (default to 10m)

View File

@ -20,14 +20,13 @@ import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = TEST, transportClientRatio = 0, numClientNodes = 0, numDataNodes = 0) @ClusterScope(scope = TEST, transportClientRatio = 0, numClientNodes = 0, numDataNodes = 0)
public class MonitoringPluginTests extends MonitoringIntegTestCase { public class MonitoringPluginTests extends MonitoringIntegTestCase {
@Override @Override
protected void startCollection() { protected void startMonitoringService() {
// do nothing as monitoring is sometime unbound // do nothing as monitoring is sometime unbound
} }
@Override @Override
protected void stopCollection() { protected void stopMonitoringService() {
// do nothing as monitoring is sometime unbound // do nothing as monitoring is sometime unbound
} }
@ -44,7 +43,7 @@ public class MonitoringPluginTests extends MonitoringIntegTestCase {
.put(XPackSettings.MONITORING_ENABLED.getKey(), true) .put(XPackSettings.MONITORING_ENABLED.getKey(), true)
.build()); .build());
assertPluginIsLoaded(); assertPluginIsLoaded();
assertServiceIsBound(AgentService.class); assertServiceIsBound(MonitoringService.class);
} }
public void testMonitoringDisabled() { public void testMonitoringDisabled() {
@ -52,7 +51,7 @@ public class MonitoringPluginTests extends MonitoringIntegTestCase {
.put(XPackSettings.MONITORING_ENABLED.getKey(), false) .put(XPackSettings.MONITORING_ENABLED.getKey(), false)
.build()); .build());
assertPluginIsLoaded(); assertPluginIsLoaded();
assertServiceIsNotBound(AgentService.class); assertServiceIsNotBound(MonitoringService.class);
} }
public void testMonitoringEnabledOnTribeNode() { public void testMonitoringEnabledOnTribeNode() {
@ -61,13 +60,13 @@ public class MonitoringPluginTests extends MonitoringIntegTestCase {
.put("tribe.name", "t1") .put("tribe.name", "t1")
.build()); .build());
assertPluginIsLoaded(); assertPluginIsLoaded();
assertServiceIsBound(AgentService.class); assertServiceIsBound(MonitoringService.class);
} }
public void testMonitoringDisabledOnTribeNode() { public void testMonitoringDisabledOnTribeNode() {
internalCluster().startNode(Settings.builder().put("tribe.name", "t1").build()); internalCluster().startNode(Settings.builder().put("tribe.name", "t1").build());
assertPluginIsLoaded(); assertPluginIsLoaded();
assertServiceIsNotBound(AgentService.class); assertServiceIsNotBound(MonitoringService.class);
} }
private void assertPluginIsLoaded() { private void assertPluginIsLoaded() {

View File

@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.xpack.monitoring.exporter.ExportException;
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.junit.After;
import org.junit.Before;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MonitoringServiceTests extends ESTestCase {
TestThreadPool threadPool;
MonitoringService monitoringService;
ClusterService clusterService;
ClusterSettings clusterSettings;
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getTestName());
clusterService = mock(ClusterService.class);
clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(MonitoringSettings.getSettings()));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
}
@After
public void terminate() throws Exception {
if (monitoringService != null) {
monitoringService.close();
}
terminate(threadPool);
}
public void testIsMonitoringActive() throws Exception {
monitoringService = new MonitoringService(Settings.EMPTY, clusterSettings, threadPool, emptySet(), new CountingExporter());
monitoringService.start();
assertBusy(() -> assertTrue(monitoringService.isStarted()));
assertTrue(monitoringService.isMonitoringActive());
monitoringService.stop();
assertBusy(() -> assertFalse(monitoringService.isStarted()));
assertFalse(monitoringService.isMonitoringActive());
monitoringService.start();
assertBusy(() -> assertTrue(monitoringService.isStarted()));
assertTrue(monitoringService.isMonitoringActive());
monitoringService.close();
assertBusy(() -> assertFalse(monitoringService.isStarted()));
assertFalse(monitoringService.isMonitoringActive());
}
public void testInterval() throws Exception {
Settings settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), TimeValue.MINUS_ONE).build();
CountingExporter exporter = new CountingExporter();
monitoringService = new MonitoringService(settings, clusterSettings, threadPool, emptySet(), exporter);
monitoringService.start();
assertBusy(() -> assertTrue(monitoringService.isStarted()));
assertFalse("interval -1 does not start the monitoring execution", monitoringService.isMonitoringActive());
assertEquals(0, exporter.getExportsCount());
monitoringService.setInterval(TimeValue.timeValueSeconds(1));
assertTrue(monitoringService.isMonitoringActive());
assertBusy(() -> assertThat(exporter.getExportsCount(), greaterThan(0)));
monitoringService.setInterval(TimeValue.timeValueMillis(100));
assertFalse(monitoringService.isMonitoringActive());
monitoringService.setInterval(TimeValue.MINUS_ONE);
assertFalse(monitoringService.isMonitoringActive());
}
public void testSkipExecution() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final BlockingExporter exporter = new BlockingExporter(latch);
Settings settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), MonitoringSettings.MIN_INTERVAL).build();
monitoringService = new MonitoringService(settings, clusterSettings, threadPool, emptySet(), exporter);
logger.debug("start the monitoring service");
monitoringService.start();
assertBusy(() -> assertTrue(monitoringService.isStarted()));
logger.debug("wait for the monitoring execution to be started");
assertBusy(() -> assertThat(exporter.getExportsCount(), equalTo(1)));
logger.debug("cancel current execution to avoid further execution once the latch is unblocked");
monitoringService.cancelExecution();
logger.debug("unblock the exporter");
latch.countDown();
logger.debug("verify that it hasn't been called more than one");
assertThat(exporter.getExportsCount(), equalTo(1));
}
class CountingExporter extends Exporters {
private final AtomicInteger exports = new AtomicInteger(0);
public CountingExporter() {
super(Settings.EMPTY, Collections.emptyMap(), clusterService);
}
@Override
public void export(Collection<MonitoringDoc> docs) throws ExportException {
exports.incrementAndGet();
}
int getExportsCount() {
return exports.get();
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
}
class BlockingExporter extends CountingExporter {
private final CountDownLatch latch;
BlockingExporter(CountDownLatch latch) {
super();
this.latch = latch;
}
@Override
public void export(Collection<MonitoringDoc> docs) throws ExportException {
super.export(docs);
try {
latch.await();
} catch (InterruptedException e) {
throw new ExportException("BlockingExporter failed", e);
}
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
}
}

View File

@ -78,9 +78,8 @@ public class MonitoringSettingsIntegTests extends MonitoringIntegTestCase {
assertThat(monitoringSettings.recoveryActiveOnly(), equalTo(recoveryActiveOnly)); assertThat(monitoringSettings.recoveryActiveOnly(), equalTo(recoveryActiveOnly));
} }
for (final AgentService service : internalCluster().getInstances(AgentService.class)) { for (final MonitoringService service : internalCluster().getInstances(MonitoringService.class)) {
assertThat(service.getSamplingInterval().millis(), equalTo(interval.millis())); assertThat(service.getInterval().millis(), equalTo(interval.millis()));
assertArrayEquals(service.collectors(), collectors);
} }
@ -124,8 +123,8 @@ public class MonitoringSettingsIntegTests extends MonitoringIntegTestCase {
continue; continue;
} }
if (setting == MonitoringSettings.INTERVAL) { if (setting == MonitoringSettings.INTERVAL) {
for (final AgentService service : internalCluster().getInstances(AgentService.class)) { for (final MonitoringService service : internalCluster().getInstances(MonitoringService.class)) {
assertEquals(service.getSamplingInterval(), setting.get(updatedSettings)); assertEquals(service.getInterval(), setting.get(updatedSettings));
} }
} else { } else {
for (final MonitoringSettings monitoringSettings1 : internalCluster().getInstances(MonitoringSettings.class)) { for (final MonitoringSettings monitoringSettings1 : internalCluster().getInstances(MonitoringSettings.class)) {

View File

@ -180,8 +180,7 @@ public class OldMonitoringIndicesBackwardsCompatibilityTests extends AbstractOld
} finally { } finally {
/* Now we stop monitoring and disable the HTTP exporter. We also delete all data and checks multiple times /* Now we stop monitoring and disable the HTTP exporter. We also delete all data and checks multiple times
if they have not been re created by some in flight monitoring bulk request */ if they have not been re created by some in flight monitoring bulk request */
internalCluster().getInstances(AgentService.class).forEach(AgentService::stopCollection); internalCluster().getInstances(MonitoringService.class).forEach(MonitoringService::stop);
internalCluster().getInstances(AgentService.class).forEach(AgentService::stop);
Settings.Builder settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1"); Settings.Builder settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1");
if (httpExporter) { if (httpExporter) {

View File

@ -60,7 +60,7 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -52,7 +52,7 @@ public class ClusterInfoTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -58,7 +58,7 @@ public class ClusterStateTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -38,7 +38,7 @@ public class ClusterStatsTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -44,7 +44,7 @@ public class IndexRecoveryTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -36,7 +36,7 @@ public class IndexStatsTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -36,7 +36,7 @@ public class IndicesStatsTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -38,7 +38,7 @@ public class MultiNodesStatsTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -40,7 +40,7 @@ public class NodeStatsTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -52,7 +52,7 @@ public class ShardsTests extends MonitoringIntegTestCase {
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
updateMonitoringInterval(-1, TimeUnit.SECONDS); disableMonitoringInterval();
wipeMonitoringIndices(); wipeMonitoringIndices();
} }

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -30,12 +31,12 @@ import org.elasticsearch.xpack.XPackClient;
import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.monitoring.MonitoredSystem; import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringService;
import org.elasticsearch.xpack.monitoring.MonitoringSettings; import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.AgentService; import org.elasticsearch.xpack.monitoring.client.MonitoringClient;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver; import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry; import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
import org.elasticsearch.xpack.monitoring.client.MonitoringClient;
import org.elasticsearch.xpack.security.Security; import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.authc.file.FileRealm; import org.elasticsearch.xpack.security.authc.file.FileRealm;
import org.elasticsearch.xpack.security.authc.support.Hasher; import org.elasticsearch.xpack.security.authc.support.Hasher;
@ -174,7 +175,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
startCollection(); startMonitoringService();
} }
@After @After
@ -182,7 +183,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
if (watcherEnabled != null && watcherEnabled) { if (watcherEnabled != null && watcherEnabled) {
internalCluster().getInstance(WatcherLifeCycleService.class, internalCluster().getMasterName()).stop(); internalCluster().getInstance(WatcherLifeCycleService.class, internalCluster().getMasterName()).stop();
} }
stopCollection(); stopMonitoringService();
super.tearDown(); super.tearDown();
} }
@ -202,36 +203,29 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
return false; return false;
} }
protected void stopCollection() { protected void startMonitoringService() {
for (AgentService agent : internalCluster().getInstances(AgentService.class)) { internalCluster().getInstances(MonitoringService.class).forEach(MonitoringService::start);
agent.stopCollection();
}
} }
protected void startCollection() { protected void stopMonitoringService() {
for (AgentService agent : internalCluster().getInstances(AgentService.class)) { internalCluster().getInstances(MonitoringService.class).forEach(MonitoringService::stop);
agent.startCollection();
}
} }
protected void wipeMonitoringIndices() throws Exception { protected void wipeMonitoringIndices() throws Exception {
CountDown retries = new CountDown(3); CountDown retries = new CountDown(3);
assertBusy(new Runnable() { assertBusy(() -> {
@Override try {
public void run() { boolean exist = client().admin().indices().prepareExists(MONITORING_INDICES_PREFIX + "*")
try { .get().isExists();
boolean exist = client().admin().indices().prepareExists(MONITORING_INDICES_PREFIX + "*") if (exist) {
.get().isExists(); deleteMonitoringIndices();
if (exist) { } else {
deleteMonitoringIndices();
} else {
retries.countDown();
}
} catch (IndexNotFoundException e) {
retries.countDown(); retries.countDown();
} }
assertThat(retries.isCountedDown(), is(true)); } catch (IndexNotFoundException e) {
retries.countDown();
} }
assertThat(retries.isCountedDown(), is(true));
}); });
} }
@ -364,6 +358,10 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
return parent + "." + field; return parent + "." + field;
} }
protected void disableMonitoringInterval() {
updateMonitoringInterval(TimeValue.MINUS_ONE.millis(), TimeUnit.MILLISECONDS);
}
protected void updateMonitoringInterval(long value, TimeUnit timeUnit) { protected void updateMonitoringInterval(long value, TimeUnit timeUnit) {
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), value, timeUnit))); Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), value, timeUnit)));