[Test] Restore random documents indexing in LocalExporterTests (elastic/x-pack-elasticsearch#1328)

In elastic/x-pack-elasticsearch#1212, we removed a randomized condition that inserts documents when
the exporter is enabled but before the monitoring is started. This
condition should not have been removed as it allows to test that the
exporters are correctly initialized when the Monitoring Bulk API is used
 by an external application.

This commit also fixed a failure when a search request fails because the
 monitoring indices are not yet ready (see
 https://github.com/elastic/x-pack-elasticsearch/issues/956#issuecomment-298338589)

Original commit: elastic/x-pack-elasticsearch@73ab535ae0
This commit is contained in:
Tanguy Leroux 2017-05-09 09:39:25 +02:00 committed by GitHub
parent b450664766
commit 0a860df9f9
1 changed files with 63 additions and 22 deletions

View File

@ -24,7 +24,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.TestCluster;
import org.elasticsearch.xpack.XPackClient; import org.elasticsearch.xpack.XPackClient;
@ -51,10 +51,8 @@ import org.junit.AfterClass;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -69,6 +67,7 @@ import static org.elasticsearch.xpack.monitoring.MonitoredSystem.KIBANA;
import static org.elasticsearch.xpack.monitoring.MonitoredSystem.LOGSTASH; import static org.elasticsearch.xpack.monitoring.MonitoredSystem.LOGSTASH;
import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX; import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
public class LocalExporterTests extends MonitoringIntegTestCase { public class LocalExporterTests extends MonitoringIntegTestCase {
@ -114,12 +113,12 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
@After @After
public void stopMonitoring() throws Exception { public void stopMonitoring() throws Exception {
// We start by disabling the monitoring service, so that no more collection are started // Now disabling the monitoring service, so that no more collection are started
assertAcked(client().admin().cluster().prepareUpdateSettings() assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(MonitoringSettings.INTERVAL.getKey()))); .setTransientSettings(Settings.builder().putNull(MonitoringSettings.INTERVAL.getKey())));
// Exporters are still enabled, allowing on-going collections to be exported without errors. // Exporters are still enabled, allowing on-going collections to be exported without errors.
// This assertion loop waits for in flight exportings to terminate. It checks that the latest // This assertion loop waits for in flight exports to terminate. It checks that the latest
// node_stats document collected for each node is at least 10 seconds old, corresponding to // node_stats document collected for each node is at least 10 seconds old, corresponding to
// 2 or 3 elapsed collection intervals. // 2 or 3 elapsed collection intervals.
final int elapsedInSeconds = 10; final int elapsedInSeconds = 10;
@ -130,11 +129,12 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
.subAggregation(max("agg_last_time_collected").field("timestamp"))) .subAggregation(max("agg_last_time_collected").field("timestamp")))
.get(); .get();
StringTerms aggregation = response.getAggregations().get("agg_nodes_ids"); Terms aggregation = response.getAggregations().get("agg_nodes_ids");
for (String nodeName : internalCluster().getNodeNames()) { for (String nodeName : internalCluster().getNodeNames()) {
String nodeId = internalCluster().clusterService(nodeName).localNode().getId(); String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
StringTerms.Bucket bucket = aggregation.getBucketByKey(nodeId); Terms.Bucket bucket = aggregation.getBucketByKey(nodeId);
assertTrue(bucket != null && bucket.getDocCount() >= 1L); assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null);
assertTrue(bucket.getDocCount() >= 1L);
Max subAggregation = bucket.getAggregations().get("agg_last_time_collected"); Max subAggregation = bucket.getAggregations().get("agg_last_time_collected");
DateTime lastCollection = new DateTime(Math.round(subAggregation.getValue()), DateTimeZone.UTC); DateTime lastCollection = new DateTime(Math.round(subAggregation.getValue()), DateTimeZone.UTC);
@ -179,8 +179,32 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
} }
// local exporter is now enabled // local exporter is now enabled
assertAcked(client().admin().cluster().prepareUpdateSettings() assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings));
.setTransientSettings(exporterSettings));
if (randomBoolean()) {
// export some documents now, before starting the monitoring service
final int nbDocs = randomIntBetween(1, 20);
List<MonitoringBulkDoc> monitoringDocs = new ArrayList<>(nbDocs);
for (int i = 0; i < nbDocs; i++) {
monitoringDocs.add(createMonitoringBulkDoc(String.valueOf(i)));
}
assertBusy(() -> {
MonitoringBulkRequestBuilder bulk = monitoringClient().prepareMonitoringBulk();
monitoringDocs.forEach(bulk::add);
assertEquals(RestStatus.OK, bulk.get().status());
refresh();
SearchResponse response = client().prepareSearch(".monitoring-*").get();
assertEquals(nbDocs, response.getHits().getTotalHits());
});
checkMonitoringTemplates();
checkMonitoringPipeline();
checkMonitoringAliases();
checkMonitoringMappings();
checkMonitoringDocs();
}
// monitoring service is started // monitoring service is started
exporterSettings = Settings.builder() exporterSettings = Settings.builder()
@ -190,7 +214,9 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
final int numNodes = internalCluster().getNodeNames().length; final int numNodes = internalCluster().getNodeNames().length;
assertBusy(() -> { assertBusy(() -> {
refresh(".monitoring-*"); assertThat(client().admin().indices().prepareExists(".monitoring-*").get().isExists(), is(true));
ensureYellow(".monitoring-*");
assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("cluster_state") assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("cluster_state")
.get().getHits().getTotalHits(), greaterThan(0L)); .get().getHits().getTotalHits(), greaterThan(0L));
@ -221,13 +247,15 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
.addAggregation(terms("agg_nodes_ids").field("node_stats.node_id")) .addAggregation(terms("agg_nodes_ids").field("node_stats.node_id"))
.get(); .get();
StringTerms aggregation = response.getAggregations().get("agg_nodes_ids"); Terms aggregation = response.getAggregations().get("agg_nodes_ids");
assertEquals("Aggregation on node_id must return a bucket per node involved in test", assertEquals("Aggregation on node_id must return a bucket per node involved in test",
numNodes, aggregation.getBuckets().size()); numNodes, aggregation.getBuckets().size());
for (String nodeName : internalCluster().getNodeNames()) { for (String nodeName : internalCluster().getNodeNames()) {
String nodeId = internalCluster().clusterService(nodeName).localNode().getId(); String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
assertTrue(aggregation.getBucketByKey(nodeId).getDocCount() >= 1L); Terms.Bucket bucket = aggregation.getBucketByKey(nodeId);
assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null);
assertTrue(bucket.getDocCount() >= 1L);
} }
}, 30L, TimeUnit.SECONDS); }, 30L, TimeUnit.SECONDS);
@ -348,17 +376,27 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
String type = hit.getType(); String type = hit.getType();
assertTrue(Strings.hasText(type)); assertTrue(Strings.hasText(type));
Set<String> expectedIndex = new HashSet<>(); @SuppressWarnings("unchecked")
if ("cluster_info".equals(type) || type.startsWith("data")) { Map<String, Object> docSource = (Map<String, Object>) source.get("doc");
expectedIndex.add(".monitoring-data-2");
boolean isData;
MonitoredSystem expectedSystem;
if (docSource == null) {
// This is a document indexed by the Monitoring service
expectedSystem = MonitoredSystem.ES;
isData = "cluster_info".equals(type);
} else { } else {
MonitoredSystem system = MonitoredSystem.ES; // This is a document indexed through the Monitoring Bulk API
if (type.startsWith("timestamped")) { expectedSystem = MonitoredSystem.fromSystem((String) docSource.get("expected_system"));
system = MonitoredSystem.fromSystem(type.substring(type.indexOf("_") + 1)); isData = (Boolean) docSource.getOrDefault("is_data", false);
} }
Set<String> expectedIndex = new HashSet<>();
if (isData) {
expectedIndex.add(".monitoring-data-2");
} else {
String dateTime = dateFormatter.print(dateParser.parseDateTime(timestamp)); String dateTime = dateFormatter.print(dateParser.parseDateTime(timestamp));
expectedIndex.add(".monitoring-" + system.getSystem() + "-2-" + dateTime); expectedIndex.add(".monitoring-" + expectedSystem.getSystem() + "-2-" + dateTime);
if ("node".equals(type)) { if ("node".equals(type)) {
expectedIndex.add(".monitoring-data-2"); expectedIndex.add(".monitoring-data-2");
@ -384,6 +422,10 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) {
builder.startObject(); builder.startObject();
{ {
builder.field("expected_system", monitoringId);
if (index == MonitoringIndex.DATA) {
builder.field("is_data", true);
}
final int nbFields = randomIntBetween(1, 3); final int nbFields = randomIntBetween(1, 3);
for (int i = 0; i < nbFields; i++) { for (int i = 0; i < nbFields; i++) {
builder.field("field_" + i, i); builder.field("field_" + i, i);
@ -393,7 +435,6 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
source = builder.bytes(); source = builder.bytes();
} }
return new MonitoringBulkDoc(monitoringId, monitoringVersion, index, monitoringId, id, source, return new MonitoringBulkDoc(monitoringId, monitoringVersion, index, "doc", id, source, xContentType);
xContentType);
} }
} }