[Test] LocalExporterTests should wait for exporters to terminate in a finally block (elastic/x-pack-elasticsearch#1581)

At the end of the test, LocalExporterTests checks if no more monitoring
 data are exporter by checking multiple times the last time nodes_stats
 documents were exported, stopping after 10 seconds. It does this in a
 @After annotated method but it would be better to do this in a finally
 block. Also, it should search for node_stats documents only if the
 monitoring indices exist and are searchable to avoid some "all shards
 failed" failures.

Original commit: elastic/x-pack-elasticsearch@90ffb4affd
This commit is contained in:
Tanguy Leroux 2017-06-02 09:12:49 +02:00 committed by GitHub
parent 922a337884
commit 261bf8d78d
1 changed files with 139 additions and 126 deletions

View File

@ -46,7 +46,6 @@ import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.After;
import org.junit.AfterClass;
import java.io.IOException;
@ -66,6 +65,7 @@ import static org.elasticsearch.xpack.monitoring.MonitoredSystem.BEATS;
import static org.elasticsearch.xpack.monitoring.MonitoredSystem.KIBANA;
import static org.elasticsearch.xpack.monitoring.MonitoredSystem.LOGSTASH;
import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX;
import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
@ -111,8 +111,7 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
.build();
}
@After
public void stopMonitoring() throws Exception {
private void stopMonitoring() throws Exception {
// Now disabling the monitoring service, so that no more collection are started
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(MonitoringSettings.INTERVAL.getKey())));
@ -122,23 +121,31 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
// node_stats document collected for each node is at least 10 seconds old, corresponding to
// 2 or 3 elapsed collection intervals.
final int elapsedInSeconds = 10;
final DateTime startTime = DateTime.now(DateTimeZone.UTC);
assertBusy(() -> {
refresh(".monitoring-es-2-*");
SearchResponse response = client().prepareSearch(".monitoring-es-2-*").setTypes("node_stats").setSize(0)
.addAggregation(terms("agg_nodes_ids").field("node_stats.node_id")
.subAggregation(max("agg_last_time_collected").field("timestamp")))
.get();
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(".monitoring-*").get();
if (indicesExistsResponse.isExists()) {
ensureYellow(".monitoring-*");
refresh(".monitoring-es-*");
Terms aggregation = response.getAggregations().get("agg_nodes_ids");
for (String nodeName : internalCluster().getNodeNames()) {
String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
Terms.Bucket bucket = aggregation.getBucketByKey(nodeId);
assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null);
assertTrue(bucket.getDocCount() >= 1L);
SearchResponse response = client().prepareSearch(".monitoring-es-*").setTypes("node_stats").setSize(0)
.addAggregation(terms("agg_nodes_ids").field("node_stats.node_id")
.subAggregation(max("agg_last_time_collected").field("timestamp")))
.get();
Max subAggregation = bucket.getAggregations().get("agg_last_time_collected");
DateTime lastCollection = new DateTime(Math.round(subAggregation.getValue()), DateTimeZone.UTC);
assertTrue(lastCollection.plusSeconds(elapsedInSeconds).isBefore(DateTime.now(DateTimeZone.UTC)));
Terms aggregation = response.getAggregations().get("agg_nodes_ids");
for (String nodeName : internalCluster().getNodeNames()) {
String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
Terms.Bucket bucket = aggregation.getBucketByKey(nodeId);
assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null);
assertTrue(bucket.getDocCount() >= 1L);
Max subAggregation = bucket.getAggregations().get("agg_last_time_collected");
DateTime lastCollection = new DateTime(Math.round(subAggregation.getValue()), DateTimeZone.UTC);
assertTrue(lastCollection.plusSeconds(elapsedInSeconds).isBefore(DateTime.now(DateTimeZone.UTC)));
}
} else {
assertTrue(DateTime.now(DateTimeZone.UTC).isAfter(startTime.plusSeconds(elapsedInSeconds)));
}
}, 30L, TimeUnit.SECONDS);
@ -150,125 +157,131 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
}
public void testExport() throws Exception {
if (randomBoolean()) {
// indexing some random documents
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[5];
for (int i = 0; i < indexRequestBuilders.length; i++) {
indexRequestBuilders[i] = client().prepareIndex("test", "type", Integer.toString(i))
.setSource("title", "This is a random document");
}
indexRandom(true, indexRequestBuilders);
}
if (randomBoolean()) {
// create some marvel indices to check if aliases are correctly created
final int oldies = randomIntBetween(1, 5);
for (int i = 0; i < oldies; i++) {
assertAcked(client().admin().indices().prepareCreate(".marvel-es-1-2014.12." + i)
.setSettings("number_of_shards", 1, "number_of_replicas", 0).get());
}
}
Settings.Builder exporterSettings = Settings.builder()
.put("xpack.monitoring.exporters._local.enabled", true);
String timeFormat = indexTimeFormat.get();
if (timeFormat != null) {
exporterSettings.put("xpack.monitoring.exporters._local.index.name.time_format",
timeFormat);
}
// local exporter is now enabled
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings));
if (randomBoolean()) {
// export some documents now, before starting the monitoring service
final int nbDocs = randomIntBetween(1, 20);
List<MonitoringBulkDoc> monitoringDocs = new ArrayList<>(nbDocs);
for (int i = 0; i < nbDocs; i++) {
monitoringDocs.add(createMonitoringBulkDoc(String.valueOf(i)));
try {
if (randomBoolean()) {
// indexing some random documents
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[5];
for (int i = 0; i < indexRequestBuilders.length; i++) {
indexRequestBuilders[i] = client().prepareIndex("test", "type", Integer.toString(i))
.setSource("title", "This is a random document");
}
indexRandom(true, indexRequestBuilders);
}
if (randomBoolean()) {
// create some marvel indices to check if aliases are correctly created
final int oldies = randomIntBetween(1, 5);
for (int i = 0; i < oldies; i++) {
assertAcked(client().admin().indices().prepareCreate(".marvel-es-1-2014.12." + i)
.setSettings("number_of_shards", 1, "number_of_replicas", 0).get());
}
}
Settings.Builder exporterSettings = Settings.builder()
.put("xpack.monitoring.exporters._local.enabled", true);
String timeFormat = indexTimeFormat.get();
if (timeFormat != null) {
exporterSettings.put("xpack.monitoring.exporters._local.index.name.time_format",
timeFormat);
}
// local exporter is now enabled
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings));
if (randomBoolean()) {
// export some documents now, before starting the monitoring service
final int nbDocs = randomIntBetween(1, 20);
List<MonitoringBulkDoc> monitoringDocs = new ArrayList<>(nbDocs);
for (int i = 0; i < nbDocs; i++) {
monitoringDocs.add(createMonitoringBulkDoc(String.valueOf(i)));
}
assertBusy(() -> {
MonitoringBulkRequestBuilder bulk = monitoringClient().prepareMonitoringBulk();
monitoringDocs.forEach(bulk::add);
assertEquals(RestStatus.OK, bulk.get().status());
refresh();
assertThat(client().admin().indices().prepareExists(".monitoring-*").get().isExists(), is(true));
ensureYellow(".monitoring-*");
SearchResponse response = client().prepareSearch(".monitoring-*").get();
assertEquals(nbDocs, response.getHits().getTotalHits());
});
checkMonitoringTemplates();
checkMonitoringPipeline();
checkMonitoringAliases();
checkMonitoringMappings();
checkMonitoringDocs();
}
// monitoring service is started
exporterSettings = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), 3L, TimeUnit.SECONDS);
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(exporterSettings));
final int numNodes = internalCluster().getNodeNames().length;
assertBusy(() -> {
MonitoringBulkRequestBuilder bulk = monitoringClient().prepareMonitoringBulk();
monitoringDocs.forEach(bulk::add);
assertEquals(RestStatus.OK, bulk.get().status());
refresh();
assertThat(client().admin().indices().prepareExists(".monitoring-*").get().isExists(), is(true));
ensureYellow(".monitoring-*");
SearchResponse response = client().prepareSearch(".monitoring-*").get();
assertEquals(nbDocs, response.getHits().getTotalHits());
});
assertThat(client().prepareSearch(".monitoring-es-*").setTypes("cluster_state")
.get().getHits().getTotalHits(), greaterThan(0L));
assertEquals(0L, client().prepareSearch(".monitoring-es-*").setTypes("node")
.get().getHits().getTotalHits() % numNodes);
assertThat(client().prepareSearch(".monitoring-es-*").setTypes("cluster_stats")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-es-*").setTypes("index_recovery")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-es-*").setTypes("index_stats")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-es-*").setTypes("indices_stats")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-es-*").setTypes("shards")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-data-2").setTypes("cluster_info")
.get().getHits().getTotalHits(), greaterThan(0L));
SearchResponse response = client().prepareSearch(".monitoring-es-*")
.setTypes("node_stats")
.setSize(0)
.addAggregation(terms("agg_nodes_ids").field("node_stats.node_id"))
.get();
Terms aggregation = response.getAggregations().get("agg_nodes_ids");
assertEquals("Aggregation on node_id must return a bucket per node involved in test",
numNodes, aggregation.getBuckets().size());
for (String nodeName : internalCluster().getNodeNames()) {
String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
Terms.Bucket bucket = aggregation.getBucketByKey(nodeId);
assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null);
assertTrue(bucket.getDocCount() >= 1L);
}
}, 30L, TimeUnit.SECONDS);
checkMonitoringTemplates();
checkMonitoringPipeline();
checkMonitoringAliases();
checkMonitoringMappings();
checkMonitoringWatches();
checkMonitoringDocs();
logger.info("All checks passed.");
} finally {
stopMonitoring();
}
// monitoring service is started
exporterSettings = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), 3L, TimeUnit.SECONDS);
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(exporterSettings));
final int numNodes = internalCluster().getNodeNames().length;
assertBusy(() -> {
assertThat(client().admin().indices().prepareExists(".monitoring-*").get().isExists(), is(true));
ensureYellow(".monitoring-*");
assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("cluster_state")
.get().getHits().getTotalHits(), greaterThan(0L));
assertEquals(0L, client().prepareSearch(".monitoring-es-2-*").setTypes("node")
.get().getHits().getTotalHits() % numNodes);
assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("cluster_stats")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("index_recovery")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("index_stats")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("indices_stats")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-es-2-*").setTypes("shards")
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-data-2").setTypes("cluster_info")
.get().getHits().getTotalHits(), greaterThan(0L));
SearchResponse response = client().prepareSearch(".monitoring-es-2-*")
.setTypes("node_stats")
.setSize(0)
.addAggregation(terms("agg_nodes_ids").field("node_stats.node_id"))
.get();
Terms aggregation = response.getAggregations().get("agg_nodes_ids");
assertEquals("Aggregation on node_id must return a bucket per node involved in test",
numNodes, aggregation.getBuckets().size());
for (String nodeName : internalCluster().getNodeNames()) {
String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
Terms.Bucket bucket = aggregation.getBucketByKey(nodeId);
assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null);
assertTrue(bucket.getDocCount() >= 1L);
}
}, 30L, TimeUnit.SECONDS);
checkMonitoringTemplates();
checkMonitoringPipeline();
checkMonitoringAliases();
checkMonitoringMappings();
checkMonitoringWatches();
checkMonitoringDocs();
}
/**
@ -310,7 +323,7 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
assertEquals("marvel index should have at least 1 alias: " + index, 1, aliases.size());
String indexDate = index.substring(".marvel-es-1-".length());
String expectedAlias = ".monitoring-es-2-" + indexDate + "-alias";
String expectedAlias = ".monitoring-es-" + TEMPLATE_VERSION + "-" + indexDate + "-alias";
assertEquals(expectedAlias, aliases.get(0).getAlias());
}
}
@ -399,7 +412,7 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
expectedIndex.add(".monitoring-data-2");
} else {
String dateTime = dateFormatter.print(dateParser.parseDateTime(timestamp));
expectedIndex.add(".monitoring-" + expectedSystem.getSystem() + "-2-" + dateTime);
expectedIndex.add(".monitoring-" + expectedSystem.getSystem() + "-" + TEMPLATE_VERSION + "-" + dateTime);
if ("node".equals(type)) {
expectedIndex.add(".monitoring-data-2");
@ -417,7 +430,7 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
private static MonitoringBulkDoc createMonitoringBulkDoc(String id) throws IOException {
String monitoringId = randomFrom(BEATS, KIBANA, LOGSTASH).getSystem();
String monitoringVersion = MonitoringTemplateUtils.TEMPLATE_VERSION;
String monitoringVersion = TEMPLATE_VERSION;
MonitoringIndex index = randomFrom(MonitoringIndex.values());
XContentType xContentType = randomFrom(XContentType.values());