[TEST] Fix monitoring tests that are flaky in the presence of replicas

Many tests in monitoring use the pattern of calling first awaitMonitoringDocsCount, and then doing a search that checks certain properties, assuming
that the doc count is correct at that point. In the presence of replicas, awaitMonitoringDocsCount might not wait for all shard copies to have the
desired property. A subsequent search might then hit a shard where the property does not hold. As these tests randomize the number of replicas
(through the random_index_template), it easier to constrain awaitMonitoringDocsCount just to the primary and then do subsequent checks just by
querying the primary.

Original commit: elastic/x-pack-elasticsearch@4165beb903
This commit is contained in:
Yannick Welsch 2017-04-26 17:22:14 +02:00
parent 8af91aed6a
commit 444cbfc283
10 changed files with 45 additions and 35 deletions

View File

@ -144,7 +144,7 @@ public class MonitoringBulkTests extends MonitoringIntegTestCase {
}
assertThat(exceptions, empty());
awaitMonitoringDocsCount(greaterThanOrEqualTo(total.get()), "concurrent");
awaitMonitoringDocsCountOnPrimary(greaterThanOrEqualTo(total.get()), "concurrent");
}
public void testUnsupportedSystem() throws Exception {

View File

@ -66,10 +66,10 @@ public class ClusterInfoTests extends MonitoringIntegTestCase {
awaitIndexExists(dataIndex);
// waiting for cluster info collector to collect data
awaitMonitoringDocsCount(equalTo(1L), ClusterInfoMonitoringDoc.TYPE);
awaitMonitoringDocsCountOnPrimary(equalTo(1L), ClusterInfoMonitoringDoc.TYPE);
// retrieving cluster info document
GetResponse response = client().prepareGet(dataIndex, ClusterInfoMonitoringDoc.TYPE, clusterUUID).get();
GetResponse response = client().prepareGet(dataIndex, ClusterInfoMonitoringDoc.TYPE, clusterUUID).setPreference("_primary").get();
assertTrue("cluster_info document does not exist in data index", response.isExists());
assertThat(response.getIndex(), equalTo(dataIndex));
@ -141,6 +141,7 @@ public class ClusterInfoTests extends MonitoringIntegTestCase {
assertHitCount(client().prepareSearch().setSize(0)
.setIndices(dataIndex)
.setTypes(ClusterInfoMonitoringDoc.TYPE)
.setPreference("_primary")
.setQuery(
QueryBuilders.boolQuery()
.should(QueryBuilders.matchQuery(License.Fields.STATUS, License.Status.ACTIVE.label()))

View File

@ -63,10 +63,10 @@ public class ClusterStateTests extends MonitoringIntegTestCase {
public void testClusterState() throws Exception {
logger.debug("--> waiting for documents to be collected");
awaitMonitoringDocsCount(greaterThan(0L), ClusterStateResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThan(0L), ClusterStateResolver.TYPE);
logger.debug("--> searching for monitoring documents of type [{}]", ClusterStateResolver.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ClusterStateResolver.TYPE).get();
SearchResponse response = client().prepareSearch().setTypes(ClusterStateResolver.TYPE).setPreference("_primary").get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that every document contains the expected fields");
@ -88,17 +88,16 @@ public class ClusterStateTests extends MonitoringIntegTestCase {
*/
public void testNoNodesIndexing() throws Exception {
logger.debug("--> waiting for documents to be collected");
awaitMonitoringDocsCount(greaterThan(0L), ClusterStateResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThan(0L), ClusterStateResolver.TYPE);
logger.debug("--> searching for monitoring documents of type [{}]", ClusterStateResolver.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ClusterStateResolver.TYPE).get();
SearchResponse response = client().prepareSearch().setTypes(ClusterStateResolver.TYPE).setPreference("_primary").get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
DiscoveryNodes nodes = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes();
logger.debug("--> ensure that the 'nodes' attributes of the cluster state document is not indexed");
assertHitCount(client().prepareSearch().setSize(0)
.setTypes(ClusterStateResolver.TYPE)
assertHitCount(client().prepareSearch().setSize(0).setTypes(ClusterStateResolver.TYPE).setPreference("_primary")
.setQuery(matchQuery("cluster_state.nodes." + nodes.getMasterNodeId() + ".name",
nodes.getMasterNode().getName())).get(), 0L);
}
@ -114,10 +113,11 @@ public class ClusterStateTests extends MonitoringIntegTestCase {
awaitIndexExists(timestampedIndex);
logger.debug("--> waiting for documents to be collected");
awaitMonitoringDocsCount(greaterThanOrEqualTo(nbNodes), ClusterStateNodeMonitoringDoc.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThanOrEqualTo(nbNodes), ClusterStateNodeMonitoringDoc.TYPE);
logger.debug("--> searching for monitoring documents of type [{}]", ClusterStateNodeMonitoringDoc.TYPE);
SearchResponse response = client().prepareSearch(timestampedIndex).setTypes(ClusterStateNodeMonitoringDoc.TYPE).get();
SearchResponse response = client().prepareSearch(timestampedIndex).setTypes(ClusterStateNodeMonitoringDoc.TYPE)
.setPreference("_primary").get();
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(nbNodes));
logger.debug("--> checking that every document contains the expected fields");
@ -143,6 +143,7 @@ public class ClusterStateTests extends MonitoringIntegTestCase {
assertThat(client().prepareSearch().setSize(0)
.setIndices(timestampedIndex)
.setTypes(ClusterStateNodeMonitoringDoc.TYPE)
.setPreference("_primary")
.setQuery(QueryBuilders.matchQuery(MonitoringIndexNameResolver.Fields.SOURCE_NODE + ".attributes.custom", randomInt))
.get().getHits().getTotalHits(), greaterThan(0L));
@ -159,10 +160,10 @@ public class ClusterStateTests extends MonitoringIntegTestCase {
awaitIndexExists(dataIndex);
logger.debug("--> waiting for documents to be collected");
awaitMonitoringDocsCount(greaterThanOrEqualTo(nbNodes), DiscoveryNodeResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThanOrEqualTo(nbNodes), DiscoveryNodeResolver.TYPE);
logger.debug("--> searching for monitoring documents of type [{}]", DiscoveryNodeResolver.TYPE);
SearchResponse response = client().prepareSearch(dataIndex).setTypes(DiscoveryNodeResolver.TYPE).get();
SearchResponse response = client().prepareSearch(dataIndex).setTypes(DiscoveryNodeResolver.TYPE).setPreference("_primary").get();
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(nbNodes));
logger.debug("--> checking that every document contains the expected fields");
@ -193,11 +194,13 @@ public class ClusterStateTests extends MonitoringIntegTestCase {
final String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
logger.debug("--> getting monitoring document for node id [{}]", nodeId);
assertThat(client().prepareGet(dataIndex, DiscoveryNodeResolver.TYPE, nodeId).get().isExists(), is(true));
assertThat(client().prepareGet(dataIndex, DiscoveryNodeResolver.TYPE, nodeId).setPreference("_primary").get().isExists(),
is(true));
// checks that document is not indexed
assertHitCount(client().prepareSearch(dataIndex).setSize(0)
.setTypes(DiscoveryNodeResolver.TYPE)
.setPreference("_primary")
.setQuery(QueryBuilders.boolQuery()
.should(matchQuery("node.id", nodeId))
.should(matchQuery("node.name", nodeName))).get(), 0);

View File

@ -60,13 +60,13 @@ public class ClusterStatsTests extends MonitoringIntegTestCase {
// ok.. we'll start collecting now...
updateMonitoringInterval(3L, TimeUnit.SECONDS);
awaitMonitoringDocsCount(greaterThan(0L), ClusterStatsResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThan(0L), ClusterStatsResolver.TYPE);
assertBusy(new Runnable() {
@Override
public void run() {
logger.debug("--> checking that every document contains the expected fields");
SearchResponse response = client().prepareSearch().setTypes(ClusterStatsResolver.TYPE).get();
SearchResponse response = client().prepareSearch().setTypes(ClusterStatsResolver.TYPE).setPreference("_primary").get();
for (SearchHit searchHit : response.getHits().getHits()) {
Map<String, Object> fields = searchHit.getSourceAsMap();

View File

@ -67,13 +67,13 @@ public class IndexRecoveryTests extends MonitoringIntegTestCase {
updateMonitoringInterval(3L, TimeUnit.SECONDS);
waitForMonitoringIndices();
awaitMonitoringDocsCount(greaterThan(0L), IndexRecoveryResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThan(0L), IndexRecoveryResolver.TYPE);
String clusterUUID = client().admin().cluster().prepareState().setMetaData(true).get().getState().metaData().clusterUUID();
assertTrue(Strings.hasText(clusterUUID));
logger.debug("--> searching for monitoring documents of type [{}]", IndexRecoveryResolver.TYPE);
SearchResponse response = client().prepareSearch().setTypes(IndexRecoveryResolver.TYPE).get();
SearchResponse response = client().prepareSearch().setTypes(IndexRecoveryResolver.TYPE).setPreference("_primary").get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that every document contains the expected fields");
@ -97,11 +97,13 @@ public class IndexRecoveryTests extends MonitoringIntegTestCase {
refresh();
logger.debug("--> checking that cluster_uuid field is correctly indexed");
response = client().prepareSearch().setTypes(IndexRecoveryResolver.TYPE).setSize(0).setQuery(existsQuery("cluster_uuid")).get();
response = client().prepareSearch().setTypes(IndexRecoveryResolver.TYPE).setPreference("_primary").setSize(0)
.setQuery(existsQuery("cluster_uuid")).get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that timestamp field is correctly indexed");
response = client().prepareSearch().setTypes(IndexRecoveryResolver.TYPE).setSize(0).setQuery(existsQuery("timestamp")).get();
response = client().prepareSearch().setTypes(IndexRecoveryResolver.TYPE).setPreference("_primary").setSize(0)
.setQuery(existsQuery("timestamp")).get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that other fields are not indexed");
@ -115,7 +117,8 @@ public class IndexRecoveryTests extends MonitoringIntegTestCase {
};
for (String field : fields) {
response = client().prepareSearch().setTypes(IndexRecoveryResolver.TYPE).setSize(0).setQuery(existsQuery(field)).get();
response = client().prepareSearch().setTypes(IndexRecoveryResolver.TYPE).setPreference("_primary").setSize(0)
.setQuery(existsQuery(field)).get();
assertHitCount(response, 0L);
}

View File

@ -62,7 +62,7 @@ public class IndexStatsTests extends MonitoringIntegTestCase {
updateMonitoringInterval(3L, TimeUnit.SECONDS);
waitForMonitoringIndices();
awaitMonitoringDocsCount(greaterThan(0L), IndexStatsResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThan(0L), IndexStatsResolver.TYPE);
logger.debug("--> wait for index stats collector to collect stat for each index");
assertBusy(new Runnable() {
@ -74,6 +74,7 @@ public class IndexStatsTests extends MonitoringIntegTestCase {
SearchResponse count = client().prepareSearch()
.setSize(0)
.setTypes(IndexStatsResolver.TYPE)
.setPreference("_primary")
.setQuery(QueryBuilders.termQuery("index_stats.index", indices[i]))
.get();
assertThat(count.getHits().getTotalHits(), greaterThan(0L));
@ -82,7 +83,7 @@ public class IndexStatsTests extends MonitoringIntegTestCase {
});
logger.debug("--> searching for monitoring documents of type [{}]", IndexStatsResolver.TYPE);
SearchResponse response = client().prepareSearch().setTypes(IndexStatsResolver.TYPE).get();
SearchResponse response = client().prepareSearch().setTypes(IndexStatsResolver.TYPE).setPreference("_primary").get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that every document contains the expected fields");

View File

@ -73,10 +73,10 @@ public class IndicesStatsTests extends MonitoringIntegTestCase {
waitForMonitoringIndices();
logger.debug("--> wait for indices stats collector to collect global stat");
awaitMonitoringDocsCount(greaterThan(0L), IndicesStatsResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThan(0L), IndicesStatsResolver.TYPE);
logger.debug("--> searching for monitoring documents of type [{}]", IndicesStatsResolver.TYPE);
SearchResponse response = client().prepareSearch().setTypes(IndicesStatsResolver.TYPE).get();
SearchResponse response = client().prepareSearch().setTypes(IndicesStatsResolver.TYPE).setPreference("_primary").get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that every document contains the expected fields");

View File

@ -54,9 +54,9 @@ public class NodeStatsTests extends MonitoringIntegTestCase {
updateMonitoringInterval(3L, TimeUnit.SECONDS);
waitForMonitoringIndices();
awaitMonitoringDocsCount(greaterThan(0L), NodeStatsResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThan(0L), NodeStatsResolver.TYPE);
SearchResponse response = client().prepareSearch().setTypes(NodeStatsResolver.TYPE).get();
SearchResponse response = client().prepareSearch().setTypes(NodeStatsResolver.TYPE).setPreference("_primary").get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
for (SearchHit searchHit : response.getHits().getHits()) {

View File

@ -68,10 +68,10 @@ public class ShardsTests extends MonitoringIntegTestCase {
updateMonitoringInterval(1L, TimeUnit.SECONDS);
waitForMonitoringIndices();
awaitMonitoringDocsCount(greaterThan(0L), ShardsResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThan(0L), ShardsResolver.TYPE);
logger.debug("--> searching for monitoring documents of type [{}]", ShardsResolver.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ShardsResolver.TYPE).get();
SearchResponse response = client().prepareSearch().setTypes(ShardsResolver.TYPE).setPreference("_primary").get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that every document contains the expected fields");
@ -98,11 +98,12 @@ public class ShardsTests extends MonitoringIntegTestCase {
updateMonitoringInterval(1L, TimeUnit.SECONDS);
waitForMonitoringIndices();
awaitMonitoringDocsCount(greaterThan(0L), ShardsResolver.TYPE);
awaitMonitoringDocsCountOnPrimary(greaterThan(0L), ShardsResolver.TYPE);
SearchRequestBuilder searchRequestBuilder = client()
.prepareSearch()
.setTypes(ShardsResolver.TYPE)
.setPreference("_primary")
.setQuery(QueryBuilders.termQuery("shard.index", indexName));
String[] notAnalyzedFields = {"state_uuid", "shard.state", "shard.index", "shard.node"};

View File

@ -247,18 +247,19 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
assertAcked(client().admin().indices().prepareDelete(MONITORING_INDICES_PREFIX + "*"));
}
protected void awaitMonitoringDocsCount(Matcher<Long> matcher, String... types) throws Exception {
assertBusy(() -> assertMonitoringDocsCount(matcher, types), 30, TimeUnit.SECONDS);
protected void awaitMonitoringDocsCountOnPrimary(Matcher<Long> matcher, String... types) throws Exception {
assertBusy(() -> assertMonitoringDocsCountOnPrimary(matcher, types), 30, TimeUnit.SECONDS);
}
protected void ensureMonitoringIndicesYellow() {
ensureYellow(".monitoring-es-*");
}
protected void assertMonitoringDocsCount(Matcher<Long> matcher, String... types) {
protected void assertMonitoringDocsCountOnPrimary(Matcher<Long> matcher, String... types) {
flushAndRefresh(MONITORING_INDICES_PREFIX + "*");
long count = client().prepareSearch(MONITORING_INDICES_PREFIX + "*").setSize(0).setTypes(types).get().getHits().getTotalHits();
logger.trace("--> searched for [{}] documents, found [{}]", Strings.arrayToCommaDelimitedString(types), count);
long count = client().prepareSearch(MONITORING_INDICES_PREFIX + "*").setSize(0).setTypes(types)
.setPreference("_primary").get().getHits().getTotalHits();
logger.trace("--> searched for [{}] documents on primary, found [{}]", Strings.arrayToCommaDelimitedString(types), count);
assertThat(count, matcher);
}