From 0a860df9f9e1902e918dea1ade2562eb934c164a Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 9 May 2017 09:39:25 +0200 Subject: [PATCH] [Test] Restore random documents indexing in LocalExporterTests (elastic/x-pack-elasticsearch#1328) In elastic/x-pack-elasticsearch#1212, we removed a randomized condition that inserts documents when the exporter is enabled but before the monitoring is started. This condition should not have been removed as it allows to test that the exporters are correctly initialized when the Monitoring Bulk API is used by an external application. This commit also fixed a failure when a search request fails because the monitoring indices are not yet ready (see https://github.com/elastic/x-pack-elasticsearch/issues/956#issuecomment-298338589) Original commit: elastic/x-pack-elasticsearch@73ab535ae005ffc2073ecd45c1942f1fa2341fe1 --- .../exporter/local/LocalExporterTests.java | 85 ++++++++++++++----- 1 file changed, 63 insertions(+), 22 deletions(-) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java index f8332e3b137..69ae271395f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java @@ -24,7 +24,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.test.TestCluster; import org.elasticsearch.xpack.XPackClient; @@ -51,10 +51,8 @@ import org.junit.AfterClass; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -69,6 +67,7 @@ import static org.elasticsearch.xpack.monitoring.MonitoredSystem.KIBANA; import static org.elasticsearch.xpack.monitoring.MonitoredSystem.LOGSTASH; import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; public class LocalExporterTests extends MonitoringIntegTestCase { @@ -114,12 +113,12 @@ public class LocalExporterTests extends MonitoringIntegTestCase { @After public void stopMonitoring() throws Exception { - // We start by disabling the monitoring service, so that no more collection are started + // Now disabling the monitoring service, so that no more collection are started assertAcked(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder().putNull(MonitoringSettings.INTERVAL.getKey()))); // Exporters are still enabled, allowing on-going collections to be exported without errors. - // This assertion loop waits for in flight exportings to terminate. It checks that the latest + // This assertion loop waits for in flight exports to terminate. It checks that the latest // node_stats document collected for each node is at least 10 seconds old, corresponding to // 2 or 3 elapsed collection intervals. final int elapsedInSeconds = 10; @@ -130,11 +129,12 @@ public class LocalExporterTests extends MonitoringIntegTestCase { .subAggregation(max("agg_last_time_collected").field("timestamp"))) .get(); - StringTerms aggregation = response.getAggregations().get("agg_nodes_ids"); + Terms aggregation = response.getAggregations().get("agg_nodes_ids"); for (String nodeName : internalCluster().getNodeNames()) { String nodeId = internalCluster().clusterService(nodeName).localNode().getId(); - StringTerms.Bucket bucket = aggregation.getBucketByKey(nodeId); - assertTrue(bucket != null && bucket.getDocCount() >= 1L); + Terms.Bucket bucket = aggregation.getBucketByKey(nodeId); + assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null); + assertTrue(bucket.getDocCount() >= 1L); Max subAggregation = bucket.getAggregations().get("agg_last_time_collected"); DateTime lastCollection = new DateTime(Math.round(subAggregation.getValue()), DateTimeZone.UTC); @@ -179,8 +179,32 @@ public class LocalExporterTests extends MonitoringIntegTestCase { } // local exporter is now enabled - assertAcked(client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(exporterSettings)); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + if (randomBoolean()) { + // export some documents now, before starting the monitoring service + final int nbDocs = randomIntBetween(1, 20); + List monitoringDocs = new ArrayList<>(nbDocs); + for (int i = 0; i < nbDocs; i++) { + monitoringDocs.add(createMonitoringBulkDoc(String.valueOf(i))); + } + + assertBusy(() -> { + MonitoringBulkRequestBuilder bulk = monitoringClient().prepareMonitoringBulk(); + monitoringDocs.forEach(bulk::add); + assertEquals(RestStatus.OK, bulk.get().status()); + refresh(); + + SearchResponse response = client().prepareSearch(".monitoring-*").get(); + assertEquals(nbDocs, response.getHits().getTotalHits()); + }); + + checkMonitoringTemplates(); + checkMonitoringPipeline(); + checkMonitoringAliases(); + checkMonitoringMappings(); + checkMonitoringDocs(); + } // monitoring service is started exporterSettings = Settings.builder() @@ -190,7 +214,9 @@ public class LocalExporterTests extends MonitoringIntegTestCase { final int numNodes = internalCluster().getNodeNames().length; assertBusy(() -> { - refresh(".monitoring-*"); + assertThat(client().admin().indices().prepareExists(".monitoring-*").get().isExists(), is(true)); + ensureYellow(".monitoring-*"); + assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("cluster_state") .get().getHits().getTotalHits(), greaterThan(0L)); @@ -221,13 +247,15 @@ public class LocalExporterTests extends MonitoringIntegTestCase { .addAggregation(terms("agg_nodes_ids").field("node_stats.node_id")) .get(); - StringTerms aggregation = response.getAggregations().get("agg_nodes_ids"); + Terms aggregation = response.getAggregations().get("agg_nodes_ids"); assertEquals("Aggregation on node_id must return a bucket per node involved in test", numNodes, aggregation.getBuckets().size()); for (String nodeName : internalCluster().getNodeNames()) { String nodeId = internalCluster().clusterService(nodeName).localNode().getId(); - assertTrue(aggregation.getBucketByKey(nodeId).getDocCount() >= 1L); + Terms.Bucket bucket = aggregation.getBucketByKey(nodeId); + assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null); + assertTrue(bucket.getDocCount() >= 1L); } }, 30L, TimeUnit.SECONDS); @@ -348,17 +376,27 @@ public class LocalExporterTests extends MonitoringIntegTestCase { String type = hit.getType(); assertTrue(Strings.hasText(type)); + @SuppressWarnings("unchecked") + Map docSource = (Map) source.get("doc"); + + boolean isData; + MonitoredSystem expectedSystem; + if (docSource == null) { + // This is a document indexed by the Monitoring service + expectedSystem = MonitoredSystem.ES; + isData = "cluster_info".equals(type); + } else { + // This is a document indexed through the Monitoring Bulk API + expectedSystem = MonitoredSystem.fromSystem((String) docSource.get("expected_system")); + isData = (Boolean) docSource.getOrDefault("is_data", false); + } + Set expectedIndex = new HashSet<>(); - if ("cluster_info".equals(type) || type.startsWith("data")) { + if (isData) { expectedIndex.add(".monitoring-data-2"); } else { - MonitoredSystem system = MonitoredSystem.ES; - if (type.startsWith("timestamped")) { - system = MonitoredSystem.fromSystem(type.substring(type.indexOf("_") + 1)); - } - String dateTime = dateFormatter.print(dateParser.parseDateTime(timestamp)); - expectedIndex.add(".monitoring-" + system.getSystem() + "-2-" + dateTime); + expectedIndex.add(".monitoring-" + expectedSystem.getSystem() + "-2-" + dateTime); if ("node".equals(type)) { expectedIndex.add(".monitoring-data-2"); @@ -384,6 +422,10 @@ public class LocalExporterTests extends MonitoringIntegTestCase { try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { builder.startObject(); { + builder.field("expected_system", monitoringId); + if (index == MonitoringIndex.DATA) { + builder.field("is_data", true); + } final int nbFields = randomIntBetween(1, 3); for (int i = 0; i < nbFields; i++) { builder.field("field_" + i, i); @@ -393,7 +435,6 @@ public class LocalExporterTests extends MonitoringIntegTestCase { source = builder.bytes(); } - return new MonitoringBulkDoc(monitoringId, monitoringVersion, index, monitoringId, id, source, - xContentType); + return new MonitoringBulkDoc(monitoringId, monitoringVersion, index, "doc", id, source, xContentType); } }