diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java index f8332e3b137..69ae271395f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java @@ -24,7 +24,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.test.TestCluster; import org.elasticsearch.xpack.XPackClient; @@ -51,10 +51,8 @@ import org.junit.AfterClass; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -69,6 +67,7 @@ import static org.elasticsearch.xpack.monitoring.MonitoredSystem.KIBANA; import static org.elasticsearch.xpack.monitoring.MonitoredSystem.LOGSTASH; import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; public class LocalExporterTests extends MonitoringIntegTestCase { @@ -114,12 +113,12 @@ public class LocalExporterTests extends MonitoringIntegTestCase { @After public void stopMonitoring() throws Exception { - // We start by disabling the monitoring service, so that no more collection are started + // Now disabling the monitoring service, so that no more collection are started assertAcked(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder().putNull(MonitoringSettings.INTERVAL.getKey()))); // Exporters are still enabled, allowing on-going collections to be exported without errors. - // This assertion loop waits for in flight exportings to terminate. It checks that the latest + // This assertion loop waits for in flight exports to terminate. It checks that the latest // node_stats document collected for each node is at least 10 seconds old, corresponding to // 2 or 3 elapsed collection intervals. final int elapsedInSeconds = 10; @@ -130,11 +129,12 @@ public class LocalExporterTests extends MonitoringIntegTestCase { .subAggregation(max("agg_last_time_collected").field("timestamp"))) .get(); - StringTerms aggregation = response.getAggregations().get("agg_nodes_ids"); + Terms aggregation = response.getAggregations().get("agg_nodes_ids"); for (String nodeName : internalCluster().getNodeNames()) { String nodeId = internalCluster().clusterService(nodeName).localNode().getId(); - StringTerms.Bucket bucket = aggregation.getBucketByKey(nodeId); - assertTrue(bucket != null && bucket.getDocCount() >= 1L); + Terms.Bucket bucket = aggregation.getBucketByKey(nodeId); + assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null); + assertTrue(bucket.getDocCount() >= 1L); Max subAggregation = bucket.getAggregations().get("agg_last_time_collected"); DateTime lastCollection = new DateTime(Math.round(subAggregation.getValue()), DateTimeZone.UTC); @@ -179,8 +179,32 @@ public class LocalExporterTests extends MonitoringIntegTestCase { } // local exporter is now enabled - assertAcked(client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(exporterSettings)); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + if (randomBoolean()) { + // export some documents now, before starting the monitoring service + final int nbDocs = randomIntBetween(1, 20); + List monitoringDocs = new ArrayList<>(nbDocs); + for (int i = 0; i < nbDocs; i++) { + monitoringDocs.add(createMonitoringBulkDoc(String.valueOf(i))); + } + + assertBusy(() -> { + MonitoringBulkRequestBuilder bulk = monitoringClient().prepareMonitoringBulk(); + monitoringDocs.forEach(bulk::add); + assertEquals(RestStatus.OK, bulk.get().status()); + refresh(); + + SearchResponse response = client().prepareSearch(".monitoring-*").get(); + assertEquals(nbDocs, response.getHits().getTotalHits()); + }); + + checkMonitoringTemplates(); + checkMonitoringPipeline(); + checkMonitoringAliases(); + checkMonitoringMappings(); + checkMonitoringDocs(); + } // monitoring service is started exporterSettings = Settings.builder() @@ -190,7 +214,9 @@ public class LocalExporterTests extends MonitoringIntegTestCase { final int numNodes = internalCluster().getNodeNames().length; assertBusy(() -> { - refresh(".monitoring-*"); + assertThat(client().admin().indices().prepareExists(".monitoring-*").get().isExists(), is(true)); + ensureYellow(".monitoring-*"); + assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("cluster_state") .get().getHits().getTotalHits(), greaterThan(0L)); @@ -221,13 +247,15 @@ public class LocalExporterTests extends MonitoringIntegTestCase { .addAggregation(terms("agg_nodes_ids").field("node_stats.node_id")) .get(); - StringTerms aggregation = response.getAggregations().get("agg_nodes_ids"); + Terms aggregation = response.getAggregations().get("agg_nodes_ids"); assertEquals("Aggregation on node_id must return a bucket per node involved in test", numNodes, aggregation.getBuckets().size()); for (String nodeName : internalCluster().getNodeNames()) { String nodeId = internalCluster().clusterService(nodeName).localNode().getId(); - assertTrue(aggregation.getBucketByKey(nodeId).getDocCount() >= 1L); + Terms.Bucket bucket = aggregation.getBucketByKey(nodeId); + assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null); + assertTrue(bucket.getDocCount() >= 1L); } }, 30L, TimeUnit.SECONDS); @@ -348,17 +376,27 @@ public class LocalExporterTests extends MonitoringIntegTestCase { String type = hit.getType(); assertTrue(Strings.hasText(type)); + @SuppressWarnings("unchecked") + Map docSource = (Map) source.get("doc"); + + boolean isData; + MonitoredSystem expectedSystem; + if (docSource == null) { + // This is a document indexed by the Monitoring service + expectedSystem = MonitoredSystem.ES; + isData = "cluster_info".equals(type); + } else { + // This is a document indexed through the Monitoring Bulk API + expectedSystem = MonitoredSystem.fromSystem((String) docSource.get("expected_system")); + isData = (Boolean) docSource.getOrDefault("is_data", false); + } + Set expectedIndex = new HashSet<>(); - if ("cluster_info".equals(type) || type.startsWith("data")) { + if (isData) { expectedIndex.add(".monitoring-data-2"); } else { - MonitoredSystem system = MonitoredSystem.ES; - if (type.startsWith("timestamped")) { - system = MonitoredSystem.fromSystem(type.substring(type.indexOf("_") + 1)); - } - String dateTime = dateFormatter.print(dateParser.parseDateTime(timestamp)); - expectedIndex.add(".monitoring-" + system.getSystem() + "-2-" + dateTime); + expectedIndex.add(".monitoring-" + expectedSystem.getSystem() + "-2-" + dateTime); if ("node".equals(type)) { expectedIndex.add(".monitoring-data-2"); @@ -384,6 +422,10 @@ public class LocalExporterTests extends MonitoringIntegTestCase { try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { builder.startObject(); { + builder.field("expected_system", monitoringId); + if (index == MonitoringIndex.DATA) { + builder.field("is_data", true); + } final int nbFields = randomIntBetween(1, 3); for (int i = 0; i < nbFields; i++) { builder.field("field_" + i, i); @@ -393,7 +435,6 @@ public class LocalExporterTests extends MonitoringIntegTestCase { source = builder.bytes(); } - return new MonitoringBulkDoc(monitoringId, monitoringVersion, index, monitoringId, id, source, - xContentType); + return new MonitoringBulkDoc(monitoringId, monitoringVersion, index, "doc", id, source, xContentType); } }