diff --git a/elasticsearch/build.gradle b/elasticsearch/build.gradle
index ae0f33260ab..e5ec5f661bd 100644
--- a/elasticsearch/build.gradle
+++ b/elasticsearch/build.gradle
@@ -239,3 +239,9 @@ thirdPartyAudit.excludes = [
'javax.activation.UnsupportedDataTypeException'
]
+run {
+ setting 'xpack.graph.enabled', 'true'
+ setting 'xpack.security.enabled', 'true'
+ setting 'xpack.monitoring.enabled', 'true'
+ setting 'xpack.watcher.enabled', 'true'
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/AgentService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/AgentService.java
deleted file mode 100644
index f8ff2a06a27..00000000000
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/AgentService.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-package org.elasticsearch.xpack.monitoring;
-
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.logging.log4j.util.Supplier;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.lease.Releasable;
-import org.elasticsearch.common.regex.Regex;
-import org.elasticsearch.common.settings.ClusterSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.CollectionUtils;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.common.util.concurrent.ReleasableLock;
-import org.elasticsearch.xpack.monitoring.collector.Collector;
-import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
-import org.elasticsearch.xpack.monitoring.exporter.ExportException;
-import org.elasticsearch.xpack.monitoring.exporter.Exporter;
-import org.elasticsearch.xpack.monitoring.exporter.Exporters;
-import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Locale;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * The {@code AgentService} is a service that does the work of publishing the details to the monitoring cluster.
- *
- * If this service is stopped, then the attached, monitored node is not going to publish its details to the monitoring cluster. Given
- * service life cycles, the intended way to temporarily stop the publishing is using the start and stop collection methods.
- *
- * @see #stopCollection()
- * @see #startCollection()
- */
-public class AgentService extends AbstractLifecycleComponent {
-
- private volatile ExportingWorker exportingWorker;
-
- private volatile Thread workerThread;
- private volatile long samplingIntervalMillis;
- private final Collection collectors;
- private final String[] settingsCollectors;
- private final Exporters exporters;
-
- public AgentService(Settings settings, ClusterSettings clusterSettings, Set collectors, Exporters exporters) {
- super(settings);
- this.samplingIntervalMillis = MonitoringSettings.INTERVAL.get(settings).millis();
- this.settingsCollectors = MonitoringSettings.COLLECTORS.get(settings).toArray(new String[0]);
- this.collectors = Collections.unmodifiableSet(filterCollectors(collectors, settingsCollectors));
- this.exporters = exporters;
-
- clusterSettings.addSettingsUpdateConsumer(MonitoringSettings.INTERVAL, this::setInterval);
- }
-
- private void setInterval(TimeValue interval) {
- this.samplingIntervalMillis = interval.millis();
- applyIntervalSettings();
- }
-
- protected Set filterCollectors(Set collectors, String[] filters) {
- if (CollectionUtils.isEmpty(filters)) {
- return collectors;
- }
-
- Set list = new HashSet<>();
- for (Collector collector : collectors) {
- if (Regex.simpleMatch(filters, collector.name().toLowerCase(Locale.ROOT))) {
- list.add(collector);
- } else if (collector instanceof ClusterStatsCollector) {
- list.add(collector);
- }
- }
- return list;
- }
-
- protected void applyIntervalSettings() {
- if (samplingIntervalMillis <= 0) {
- logger.info("data sampling is disabled due to interval settings [{}]", samplingIntervalMillis);
- if (workerThread != null) {
-
- // notify worker to stop on its leisure, not to disturb an exporting operation
- exportingWorker.closed = true;
-
- exportingWorker = null;
- workerThread = null;
- }
- } else if (workerThread == null || !workerThread.isAlive()) {
-
- exportingWorker = new ExportingWorker();
- workerThread = new Thread(exportingWorker, EsExecutors.threadName(settings, "monitoring.exporters"));
- workerThread.setDaemon(true);
- workerThread.start();
- }
- }
-
- /** stop collection and exporting. this method blocks until all background activity is guaranteed to be stopped */
- public void stopCollection() {
- final ExportingWorker worker = this.exportingWorker;
- if (worker != null) {
- worker.stopCollecting();
- }
- }
-
- public void startCollection() {
- final ExportingWorker worker = this.exportingWorker;
- if (worker != null) {
- worker.collecting = true;
- }
- }
-
- @Override
- protected void doStart() {
- logger.debug("monitoring service started");
- exporters.start();
- applyIntervalSettings();
- }
-
- @Override
- protected void doStop() {
- if (workerThread != null && workerThread.isAlive()) {
- exportingWorker.closed = true;
- workerThread.interrupt();
- try {
- workerThread.join(60000);
- } catch (InterruptedException e) {
- // we don't care...
- }
- }
-
- exporters.stop();
- }
-
- @Override
- protected void doClose() {
- for (Exporter exporter : exporters) {
- try {
- exporter.close();
- } catch (Exception e) {
- logger.error((Supplier>) () -> new ParameterizedMessage("failed to close exporter [{}]", exporter.name()), e);
- }
- }
- }
-
- public TimeValue getSamplingInterval() {
- return TimeValue.timeValueMillis(samplingIntervalMillis);
- }
-
- public String[] collectors() {
- return settingsCollectors;
- }
-
- class ExportingWorker implements Runnable {
-
- volatile boolean closed = false;
- volatile boolean collecting = true;
-
- final ReleasableLock collectionLock = new ReleasableLock(new ReentrantLock(false));
-
- @Override
- public void run() {
- while (!closed) {
- // sleep first to allow node to complete initialization before collecting the first start
- try {
- Thread.sleep(samplingIntervalMillis);
-
- if (closed) {
- continue;
- }
-
- try (Releasable ignore = collectionLock.acquire()) {
-
- Collection docs = collect();
-
- if ((docs.isEmpty() == false) && (closed == false)) {
- exporters.export(docs);
- }
- }
-
- } catch (ExportException e) {
- logger.error("exception when exporting documents", e);
- } catch (InterruptedException e) {
- logger.trace("interrupted");
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- logger.error("background thread had an uncaught exception", e);
- }
- }
- logger.debug("worker shutdown");
- }
-
- /** stop collection and exporting. this method will be block until background collection is actually stopped */
- public void stopCollecting() {
- collecting = false;
- collectionLock.acquire().close();
- }
-
- private Collection collect() {
- if (logger.isTraceEnabled()) {
- logger.trace("collecting data - collectors [{}]", Strings.collectionToCommaDelimitedString(collectors));
- }
-
- Collection docs = new ArrayList<>();
- for (Collector collector : collectors) {
- if (collecting) {
- Collection result = collector.collect();
- if (result != null) {
- logger.trace("adding [{}] collected docs from [{}] collector", result.size(), collector.name());
- docs.addAll(result);
- } else {
- logger.trace("skipping collected docs from [{}] collector", collector.name());
- }
- }
- if (closed) {
- // Stop collecting if the worker is marked as closed
- break;
- }
- }
- return docs;
- }
- }
-}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
index ff29cd672ed..1b3fd93ae09 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
@@ -38,6 +38,7 @@ import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.ssl.SSLService;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -119,9 +120,10 @@ public class Monitoring implements ActionPlugin {
collectors.add(new ShardsCollector(settings, clusterService, monitoringSettings, licenseState));
collectors.add(new NodeStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new IndexRecoveryCollector(settings, clusterService, monitoringSettings, licenseState, client));
- final AgentService agentService = new AgentService(settings, clusterSettings, collectors, exporters);
+ final MonitoringService monitoringService =
+ new MonitoringService(settings, clusterSettings, threadPool, collectors, exporters);
- return Arrays.asList(agentService, monitoringSettings, exporters, cleanerService);
+ return Arrays.asList(monitoringService, monitoringSettings, exporters, cleanerService);
}
@Override
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringService.java
new file mode 100644
index 00000000000..838a534c48f
--- /dev/null
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringService.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring;
+
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.logging.log4j.util.Supplier;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.monitoring.collector.Collector;
+import org.elasticsearch.xpack.monitoring.exporter.Exporter;
+import org.elasticsearch.xpack.monitoring.exporter.Exporters;
+import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The {@code MonitoringService} is a service that does the work of publishing the details to the monitoring cluster.
+ *
+ * If this service is stopped, then the attached, monitored node is not going to publish its details to the monitoring cluster. Given
+ * service life cycles, the intended way to temporarily stop the publishing is using the start and stop methods.
+ */
+public class MonitoringService extends AbstractLifecycleComponent {
+
+ /** State of the monitoring service, either started or stopped **/
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ /** Task in charge of collecting and exporting monitoring data **/
+ private final MonitoringExecution monitor = new MonitoringExecution();
+
+ private final ThreadPool threadPool;
+ private final Set collectors;
+ private final Exporters exporters;
+
+ private volatile TimeValue interval;
+ private volatile ThreadPool.Cancellable scheduler;
+
+ MonitoringService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
+ Set collectors, Exporters exporters) {
+ super(settings);
+ this.threadPool = Objects.requireNonNull(threadPool);
+ this.collectors = Objects.requireNonNull(collectors);
+ this.exporters = Objects.requireNonNull(exporters);
+ this.interval = MonitoringSettings.INTERVAL.get(settings);
+ clusterSettings.addSettingsUpdateConsumer(MonitoringSettings.INTERVAL, this::setInterval);
+ }
+
+ void setInterval(TimeValue interval) {
+ this.interval = interval;
+ scheduleExecution();
+ }
+
+ public TimeValue getInterval() {
+ return interval;
+ }
+
+ boolean isMonitoringActive() {
+ return isStarted()
+ && interval != null
+ && interval.millis() >= MonitoringSettings.MIN_INTERVAL.millis();
+ }
+
+ private String threadPoolName() {
+ return ThreadPool.Names.GENERIC;
+ }
+
+ boolean isStarted() {
+ return started.get();
+ }
+
+ @Override
+ protected void doStart() {
+ if (started.compareAndSet(false, true)) {
+ try {
+ logger.debug("monitoring service is starting");
+ scheduleExecution();
+ logger.debug("monitoring service started");
+ } catch (Exception e) {
+ logger.error((Supplier>) () -> new ParameterizedMessage("failed to start monitoring service"), e);
+ started.set(false);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ if (started.getAndSet(false)) {
+ logger.debug("monitoring service is stopping");
+ cancelExecution();
+ logger.debug("monitoring service stopped");
+ }
+ }
+
+ @Override
+ protected void doClose() {
+ logger.debug("monitoring service is closing");
+ closeExecution();
+
+ for (Exporter exporter : exporters) {
+ try {
+ exporter.close();
+ } catch (Exception e) {
+ logger.error((Supplier>) () -> new ParameterizedMessage("failed to close exporter [{}]", exporter.name()), e);
+ }
+ }
+ logger.debug("monitoring service closed");
+ }
+
+ void scheduleExecution() {
+ if (scheduler != null) {
+ cancelExecution();
+ }
+ if (isMonitoringActive()) {
+ scheduler = threadPool.scheduleWithFixedDelay(monitor, interval, threadPoolName());
+ }
+ }
+
+ void cancelExecution() {
+ if (scheduler != null) {
+ try {
+ scheduler.cancel();
+ } finally {
+ scheduler = null;
+ }
+ }
+ }
+
+ void closeExecution() {
+ try {
+ monitor.close();
+ } catch (IOException e) {
+ logger.error((Supplier>) () -> new ParameterizedMessage("failed to close monitoring execution"), e);
+ }
+ }
+
+ /**
+ * {@link MonitoringExecution} is a scheduled {@link Runnable} that periodically checks if monitoring
+ * data can be collected and exported. It runs at a given interval corresponding to the monitoring
+ * sampling interval. It first checks if monitoring is still enabled (because it might have changed
+ * since the last time the task was scheduled: interval set to -1 or the monitoring service is stopped).
+ * Since collecting and exporting data can take time, it uses a semaphore to track the current execution.
+ */
+ class MonitoringExecution extends AbstractRunnable implements Closeable {
+
+ /**
+ * Binary semaphore used to wait for monitoring execution to terminate before closing or stopping
+ * the monitoring service. A semaphore is preferred over a ReentrantLock because the lock is
+ * obtained by a thread and released by another thread.
+ **/
+ private final Semaphore semaphore = new Semaphore(1);
+
+ @Override
+ public void doRun() {
+ if (isMonitoringActive() == false) {
+ logger.debug("monitoring execution is skipped");
+ return;
+ }
+
+ if (semaphore.tryAcquire() == false) {
+ logger.debug("monitoring execution is skipped until previous execution terminated");
+ return;
+ }
+
+ threadPool.executor(threadPoolName()).submit(new AbstractRunnable() {
+ @Override
+ protected void doRun() throws Exception {
+ Collection results = new ArrayList<>();
+ for (Collector collector : collectors) {
+ if (isStarted() == false) {
+ // Do not collect more data if the the monitoring service is stopping
+ // otherwise some collectors might just fail.
+ return;
+ }
+
+ try {
+ Collection result = collector.collect();
+ if (result != null) {
+ results.addAll(result);
+ }
+ } catch (Exception e) {
+ logger.warn((Supplier>) () ->
+ new ParameterizedMessage("monitoring collector [{}] failed to collect data", collector.name()), e);
+ }
+ }
+ if (isMonitoringActive()) {
+ exporters.export(results);
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ logger.warn("monitoring execution failed", e);
+ }
+
+ @Override
+ public void onRejection(Exception e) {
+ logger.warn("monitoring execution has been rejected", e);
+ }
+
+ @Override
+ public void onAfter() {
+ semaphore.release();
+ }
+ });
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ logger.warn("monitoring execution failed", e);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ // Block until the lock can be acquired
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringSettings.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringSettings.java
index ddf147b3e50..13e4c4b327c 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringSettings.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringSettings.java
@@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.monitoring;
-import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@@ -34,11 +33,23 @@ public class MonitoringSettings extends AbstractComponent {
*/
public static final TimeValue HISTORY_DURATION_MINIMUM = TimeValue.timeValueHours(24);
+ /**
+ * Minimum value for sampling interval (1 second)
+ */
+ static final TimeValue MIN_INTERVAL = TimeValue.timeValueSeconds(1L);
+
/**
* Sampling interval between two collections (default to 10s)
*/
- public static final Setting INTERVAL =
- timeSetting(collectionKey("interval"), TimeValue.timeValueSeconds(10), Property.Dynamic, Property.NodeScope);
+ public static final Setting INTERVAL = new Setting<>(collectionKey("interval"), "10s",
+ (s) -> {
+ TimeValue value = TimeValue.parseTimeValue(s, null, collectionKey("interval"));
+ if (TimeValue.MINUS_ONE.equals(value) || value.millis() >= MIN_INTERVAL.millis()) {
+ return value;
+ }
+ throw new IllegalArgumentException("Failed to parse monitoring interval [" + s + "], value must be >= " + MIN_INTERVAL);
+ },
+ Property.Dynamic, Property.NodeScope);
/**
* Timeout value when collecting index statistics (default to 10m)
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringPluginTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringPluginTests.java
index cc00210f73f..d94777dcda6 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringPluginTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringPluginTests.java
@@ -20,14 +20,13 @@ import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = TEST, transportClientRatio = 0, numClientNodes = 0, numDataNodes = 0)
public class MonitoringPluginTests extends MonitoringIntegTestCase {
-
@Override
- protected void startCollection() {
+ protected void startMonitoringService() {
// do nothing as monitoring is sometime unbound
}
@Override
- protected void stopCollection() {
+ protected void stopMonitoringService() {
// do nothing as monitoring is sometime unbound
}
@@ -44,7 +43,7 @@ public class MonitoringPluginTests extends MonitoringIntegTestCase {
.put(XPackSettings.MONITORING_ENABLED.getKey(), true)
.build());
assertPluginIsLoaded();
- assertServiceIsBound(AgentService.class);
+ assertServiceIsBound(MonitoringService.class);
}
public void testMonitoringDisabled() {
@@ -52,7 +51,7 @@ public class MonitoringPluginTests extends MonitoringIntegTestCase {
.put(XPackSettings.MONITORING_ENABLED.getKey(), false)
.build());
assertPluginIsLoaded();
- assertServiceIsNotBound(AgentService.class);
+ assertServiceIsNotBound(MonitoringService.class);
}
public void testMonitoringEnabledOnTribeNode() {
@@ -61,13 +60,13 @@ public class MonitoringPluginTests extends MonitoringIntegTestCase {
.put("tribe.name", "t1")
.build());
assertPluginIsLoaded();
- assertServiceIsBound(AgentService.class);
+ assertServiceIsBound(MonitoringService.class);
}
public void testMonitoringDisabledOnTribeNode() {
internalCluster().startNode(Settings.builder().put("tribe.name", "t1").build());
assertPluginIsLoaded();
- assertServiceIsNotBound(AgentService.class);
+ assertServiceIsNotBound(MonitoringService.class);
}
private void assertPluginIsLoaded() {
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringServiceTests.java
new file mode 100644
index 00000000000..c0680c95877
--- /dev/null
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringServiceTests.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring;
+
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.xpack.monitoring.exporter.ExportException;
+import org.elasticsearch.xpack.monitoring.exporter.Exporters;
+import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptySet;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MonitoringServiceTests extends ESTestCase {
+
+ TestThreadPool threadPool;
+ MonitoringService monitoringService;
+ ClusterService clusterService;
+ ClusterSettings clusterSettings;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ threadPool = new TestThreadPool(getTestName());
+ clusterService = mock(ClusterService.class);
+ clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(MonitoringSettings.getSettings()));
+ when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+ }
+
+ @After
+ public void terminate() throws Exception {
+ if (monitoringService != null) {
+ monitoringService.close();
+ }
+ terminate(threadPool);
+ }
+
+ public void testIsMonitoringActive() throws Exception {
+ monitoringService = new MonitoringService(Settings.EMPTY, clusterSettings, threadPool, emptySet(), new CountingExporter());
+
+ monitoringService.start();
+ assertBusy(() -> assertTrue(monitoringService.isStarted()));
+ assertTrue(monitoringService.isMonitoringActive());
+
+ monitoringService.stop();
+ assertBusy(() -> assertFalse(monitoringService.isStarted()));
+ assertFalse(monitoringService.isMonitoringActive());
+
+ monitoringService.start();
+ assertBusy(() -> assertTrue(monitoringService.isStarted()));
+ assertTrue(monitoringService.isMonitoringActive());
+
+ monitoringService.close();
+ assertBusy(() -> assertFalse(monitoringService.isStarted()));
+ assertFalse(monitoringService.isMonitoringActive());
+ }
+
+ public void testInterval() throws Exception {
+ Settings settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), TimeValue.MINUS_ONE).build();
+
+ CountingExporter exporter = new CountingExporter();
+ monitoringService = new MonitoringService(settings, clusterSettings, threadPool, emptySet(), exporter);
+
+ monitoringService.start();
+ assertBusy(() -> assertTrue(monitoringService.isStarted()));
+ assertFalse("interval -1 does not start the monitoring execution", monitoringService.isMonitoringActive());
+ assertEquals(0, exporter.getExportsCount());
+
+ monitoringService.setInterval(TimeValue.timeValueSeconds(1));
+ assertTrue(monitoringService.isMonitoringActive());
+ assertBusy(() -> assertThat(exporter.getExportsCount(), greaterThan(0)));
+
+ monitoringService.setInterval(TimeValue.timeValueMillis(100));
+ assertFalse(monitoringService.isMonitoringActive());
+
+ monitoringService.setInterval(TimeValue.MINUS_ONE);
+ assertFalse(monitoringService.isMonitoringActive());
+ }
+
+ public void testSkipExecution() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final BlockingExporter exporter = new BlockingExporter(latch);
+
+ Settings settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), MonitoringSettings.MIN_INTERVAL).build();
+ monitoringService = new MonitoringService(settings, clusterSettings, threadPool, emptySet(), exporter);
+
+ logger.debug("start the monitoring service");
+ monitoringService.start();
+ assertBusy(() -> assertTrue(monitoringService.isStarted()));
+
+ logger.debug("wait for the monitoring execution to be started");
+ assertBusy(() -> assertThat(exporter.getExportsCount(), equalTo(1)));
+
+ logger.debug("cancel current execution to avoid further execution once the latch is unblocked");
+ monitoringService.cancelExecution();
+
+ logger.debug("unblock the exporter");
+ latch.countDown();
+
+ logger.debug("verify that it hasn't been called more than one");
+ assertThat(exporter.getExportsCount(), equalTo(1));
+ }
+
+ class CountingExporter extends Exporters {
+
+ private final AtomicInteger exports = new AtomicInteger(0);
+
+ public CountingExporter() {
+ super(Settings.EMPTY, Collections.emptyMap(), clusterService);
+ }
+
+ @Override
+ public void export(Collection docs) throws ExportException {
+ exports.incrementAndGet();
+ }
+
+ int getExportsCount() {
+ return exports.get();
+ }
+
+ @Override
+ protected void doStart() {
+ }
+
+ @Override
+ protected void doStop() {
+ }
+
+ @Override
+ protected void doClose() {
+ }
+ }
+
+ class BlockingExporter extends CountingExporter {
+
+ private final CountDownLatch latch;
+
+ BlockingExporter(CountDownLatch latch) {
+ super();
+ this.latch = latch;
+ }
+
+ @Override
+ public void export(Collection docs) throws ExportException {
+ super.export(docs);
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new ExportException("BlockingExporter failed", e);
+ }
+ }
+
+ @Override
+ protected void doStart() {
+ }
+
+ @Override
+ protected void doStop() {
+ }
+
+ @Override
+ protected void doClose() {
+ }
+ }
+}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringSettingsIntegTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringSettingsIntegTests.java
index 484b6dedd82..941f62f1dee 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringSettingsIntegTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/MonitoringSettingsIntegTests.java
@@ -78,9 +78,8 @@ public class MonitoringSettingsIntegTests extends MonitoringIntegTestCase {
assertThat(monitoringSettings.recoveryActiveOnly(), equalTo(recoveryActiveOnly));
}
- for (final AgentService service : internalCluster().getInstances(AgentService.class)) {
- assertThat(service.getSamplingInterval().millis(), equalTo(interval.millis()));
- assertArrayEquals(service.collectors(), collectors);
+ for (final MonitoringService service : internalCluster().getInstances(MonitoringService.class)) {
+ assertThat(service.getInterval().millis(), equalTo(interval.millis()));
}
@@ -124,8 +123,8 @@ public class MonitoringSettingsIntegTests extends MonitoringIntegTestCase {
continue;
}
if (setting == MonitoringSettings.INTERVAL) {
- for (final AgentService service : internalCluster().getInstances(AgentService.class)) {
- assertEquals(service.getSamplingInterval(), setting.get(updatedSettings));
+ for (final MonitoringService service : internalCluster().getInstances(MonitoringService.class)) {
+ assertEquals(service.getInterval(), setting.get(updatedSettings));
}
} else {
for (final MonitoringSettings monitoringSettings1 : internalCluster().getInstances(MonitoringSettings.class)) {
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/OldMonitoringIndicesBackwardsCompatibilityTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/OldMonitoringIndicesBackwardsCompatibilityTests.java
index 633633c5684..f224150849f 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/OldMonitoringIndicesBackwardsCompatibilityTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/OldMonitoringIndicesBackwardsCompatibilityTests.java
@@ -180,8 +180,7 @@ public class OldMonitoringIndicesBackwardsCompatibilityTests extends AbstractOld
} finally {
/* Now we stop monitoring and disable the HTTP exporter. We also delete all data and checks multiple times
if they have not been re created by some in flight monitoring bulk request */
- internalCluster().getInstances(AgentService.class).forEach(AgentService::stopCollection);
- internalCluster().getInstances(AgentService.class).forEach(AgentService::stop);
+ internalCluster().getInstances(MonitoringService.class).forEach(MonitoringService::stop);
Settings.Builder settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1");
if (httpExporter) {
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java
index c441906d13d..806e09812d2 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java
@@ -60,7 +60,7 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterInfoTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterInfoTests.java
index ea616673ae8..db69509965f 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterInfoTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterInfoTests.java
@@ -52,7 +52,7 @@ public class ClusterInfoTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterStateTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterStateTests.java
index d423dab6177..818ef4f498d 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterStateTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterStateTests.java
@@ -58,7 +58,7 @@ public class ClusterStateTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterStatsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterStatsTests.java
index d1c9b32d123..5dd16583c1c 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterStatsTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/cluster/ClusterStatsTests.java
@@ -38,7 +38,7 @@ public class ClusterStatsTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndexRecoveryTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndexRecoveryTests.java
index d2a9714409d..8ec9c137013 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndexRecoveryTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndexRecoveryTests.java
@@ -44,7 +44,7 @@ public class IndexRecoveryTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndexStatsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndexStatsTests.java
index d5c97f58f6f..5463dc11ad6 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndexStatsTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndexStatsTests.java
@@ -36,7 +36,7 @@ public class IndexStatsTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndicesStatsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndicesStatsTests.java
index 3ecd98e3e90..344ce9bef85 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndicesStatsTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/indices/IndicesStatsTests.java
@@ -36,7 +36,7 @@ public class IndicesStatsTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/node/MultiNodesStatsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/node/MultiNodesStatsTests.java
index de9f4cd26c3..b64ab207c5e 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/node/MultiNodesStatsTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/node/MultiNodesStatsTests.java
@@ -38,7 +38,7 @@ public class MultiNodesStatsTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/node/NodeStatsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/node/NodeStatsTests.java
index 3fd25440dfb..a5cbedab221 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/node/NodeStatsTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/node/NodeStatsTests.java
@@ -40,7 +40,7 @@ public class NodeStatsTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/shards/ShardsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/shards/ShardsTests.java
index c71f0133b3d..deeabb494ca 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/shards/ShardsTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/resolver/shards/ShardsTests.java
@@ -52,7 +52,7 @@ public class ShardsTests extends MonitoringIntegTestCase {
@After
public void cleanup() throws Exception {
- updateMonitoringInterval(-1, TimeUnit.SECONDS);
+ disableMonitoringInterval();
wipeMonitoringIndices();
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java
index ff2a13649fd..5aa8087ad47 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java
@@ -16,6 +16,7 @@ import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -30,12 +31,12 @@ import org.elasticsearch.xpack.XPackClient;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.monitoring.MonitoringService;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
-import org.elasticsearch.xpack.monitoring.AgentService;
+import org.elasticsearch.xpack.monitoring.client.MonitoringClient;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
-import org.elasticsearch.xpack.monitoring.client.MonitoringClient;
import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.authc.file.FileRealm;
import org.elasticsearch.xpack.security.authc.support.Hasher;
@@ -174,7 +175,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
- startCollection();
+ startMonitoringService();
}
@After
@@ -182,7 +183,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
if (watcherEnabled != null && watcherEnabled) {
internalCluster().getInstance(WatcherLifeCycleService.class, internalCluster().getMasterName()).stop();
}
- stopCollection();
+ stopMonitoringService();
super.tearDown();
}
@@ -202,36 +203,29 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
return false;
}
- protected void stopCollection() {
- for (AgentService agent : internalCluster().getInstances(AgentService.class)) {
- agent.stopCollection();
- }
+ protected void startMonitoringService() {
+ internalCluster().getInstances(MonitoringService.class).forEach(MonitoringService::start);
}
- protected void startCollection() {
- for (AgentService agent : internalCluster().getInstances(AgentService.class)) {
- agent.startCollection();
- }
+ protected void stopMonitoringService() {
+ internalCluster().getInstances(MonitoringService.class).forEach(MonitoringService::stop);
}
protected void wipeMonitoringIndices() throws Exception {
CountDown retries = new CountDown(3);
- assertBusy(new Runnable() {
- @Override
- public void run() {
- try {
- boolean exist = client().admin().indices().prepareExists(MONITORING_INDICES_PREFIX + "*")
- .get().isExists();
- if (exist) {
- deleteMonitoringIndices();
- } else {
- retries.countDown();
- }
- } catch (IndexNotFoundException e) {
+ assertBusy(() -> {
+ try {
+ boolean exist = client().admin().indices().prepareExists(MONITORING_INDICES_PREFIX + "*")
+ .get().isExists();
+ if (exist) {
+ deleteMonitoringIndices();
+ } else {
retries.countDown();
}
- assertThat(retries.isCountedDown(), is(true));
+ } catch (IndexNotFoundException e) {
+ retries.countDown();
}
+ assertThat(retries.isCountedDown(), is(true));
});
}
@@ -364,6 +358,10 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
return parent + "." + field;
}
+ protected void disableMonitoringInterval() {
+ updateMonitoringInterval(TimeValue.MINUS_ONE.millis(), TimeUnit.MILLISECONDS);
+ }
+
protected void updateMonitoringInterval(long value, TimeUnit timeUnit) {
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), value, timeUnit)));