diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java index 7d97c63e91d..c63fd372cb6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.monitoring.exporter.local; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; @@ -23,10 +22,10 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.monitoring.MonitoredSystem; @@ -37,6 +36,8 @@ import org.elasticsearch.xpack.monitoring.action.MonitoringIndex; import org.elasticsearch.xpack.monitoring.exporter.Exporter; import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils; import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; @@ -53,6 +54,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.elasticsearch.search.aggregations.AggregationBuilders.max; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.monitoring.MonitoredSystem.BEATS; @@ -93,27 +95,35 @@ public class LocalExporterTests extends MonitoringIntegTestCase { @After public void stopMonitoring() throws Exception { - logger.debug("stopping monitoring service"); + // We start by disabling the monitoring service, so that no more collection are started assertAcked(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder().putNull(MonitoringSettings.INTERVAL.getKey()))); - logger.debug("deleting monitoring indices, checking multiple times in case of in-flight bulk requests"); - awaitBusy(() -> { - try { - IndicesExistsResponse existsResponse = client().admin().indices() - .prepareExists(MONITORING_INDICES_PREFIX + "*").get(); - if (existsResponse.isExists()) { - deleteMonitoringIndices(); - } - } catch (IndexNotFoundException e) { - return false; - } catch (Exception e) { - throw new ElasticsearchException("Failed to delete monitoring indices: ", e); - } - return false; - }); + // Exporters are still enabled, allowing on-going collections to be exported without errors. + // This assertion loop waits for in flight exportings to terminate. It checks that the latest + // node_stats document collected for each node is at least 10 seconds old, corresponding to + // 2 or 3 elapsed collection intervals. + final int elapsedInSeconds = 10; + assertBusy(() -> { + refresh(".monitoring-es-2-*"); + SearchResponse response = client().prepareSearch(".monitoring-es-2-*").setTypes("node_stats").setSize(0) + .addAggregation(terms("agg_nodes_ids").field("node_stats.node_id") + .subAggregation(max("agg_last_time_collected").field("timestamp"))) + .get(); - logger.debug("disabling monitoring exporters"); + StringTerms aggregation = response.getAggregations().get("agg_nodes_ids"); + for (String nodeName : internalCluster().getNodeNames()) { + String nodeId = internalCluster().clusterService(nodeName).localNode().getId(); + StringTerms.Bucket bucket = aggregation.getBucketByKey(nodeId); + assertTrue(bucket.getDocCount() >= 1L); + + Max subAggregation = bucket.getAggregations().get("agg_last_time_collected"); + DateTime lastCollection = new DateTime(Math.round(subAggregation.getValue()), DateTimeZone.UTC); + assertTrue(lastCollection.plusSeconds(elapsedInSeconds).isBefore(DateTime.now(DateTimeZone.UTC))); + } + }, 30L, TimeUnit.SECONDS); + + // We can now disable the exporters and reset the settings. assertAcked(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() .putNull("xpack.monitoring.exporters._local.enabled")