[TEST] Fix OldMonitoringIndicesBackwardsCompatibilityTests

This commit aims to fix the regularly failing test OldMonitoringIndicesBackwardsCompatibilityTests. It enables/disables monitoring data collection using time interval "-1" as well as stopping AgentService instances directly and checks multiple times if indices are still present at the end of the test.

closes elastic/elasticsearch#3999

Original commit: elastic/x-pack-elasticsearch@8ac785061a
This commit is contained in:
Tanguy Leroux 2016-11-21 16:15:42 +01:00
parent 18478d63c2
commit 7c1f4326fc

View File

@ -5,20 +5,21 @@
*/ */
package org.elasticsearch.xpack.monitoring; package org.elasticsearch.xpack.monitoring;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
import org.elasticsearch.AbstractOldXPackIndicesBackwardsCompatibilityTestCase; import org.elasticsearch.AbstractOldXPackIndicesBackwardsCompatibilityTestCase;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.test.SecuritySettingsSource; import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver; import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.resolver.cluster.ClusterStateResolver; import org.elasticsearch.xpack.monitoring.resolver.cluster.ClusterStateResolver;
import org.elasticsearch.xpack.monitoring.resolver.indices.IndexStatsResolver; import org.elasticsearch.xpack.monitoring.resolver.indices.IndexStatsResolver;
@ -42,13 +43,13 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
/** /**
* Tests for monitoring indexes created before {@link Version#CURRENT}. * Tests for monitoring indexes created before {@link Version#CURRENT}.
*/ */
//Give ourselves 30 seconds instead o 5 to shut down. Sometimes it takes a while, especially on weak hardware. But we do get there.
@ThreadLeakLingering(linger = 30000)
public class OldMonitoringIndicesBackwardsCompatibilityTests extends AbstractOldXPackIndicesBackwardsCompatibilityTestCase { public class OldMonitoringIndicesBackwardsCompatibilityTests extends AbstractOldXPackIndicesBackwardsCompatibilityTestCase {
private final boolean httpExporter = randomBoolean(); private final boolean httpExporter = randomBoolean();
@Override @Override
@ -57,8 +58,8 @@ public class OldMonitoringIndicesBackwardsCompatibilityTests extends AbstractOld
.put(XPackSettings.MONITORING_ENABLED.getKey(), true) .put(XPackSettings.MONITORING_ENABLED.getKey(), true)
// Don't clean old monitoring indexes - we want to make sure we can load them // Don't clean old monitoring indexes - we want to make sure we can load them
.put(MonitoringSettings.HISTORY_DURATION.getKey(), TimeValue.timeValueHours(1000 * 365 * 24).getStringRep()) .put(MonitoringSettings.HISTORY_DURATION.getKey(), TimeValue.timeValueHours(1000 * 365 * 24).getStringRep())
// Speed up the exporter so we don't have to wait around for it // Do not start monitoring exporters at startup
.put(MonitoringSettings.INTERVAL.getKey(), timeValueSeconds(1).getStringRep()); .put(MonitoringSettings.INTERVAL.getKey(), "-1");
if (httpExporter) { if (httpExporter) {
/* If we want to test the http exporter we have to create it but disable it. We need to create it so we don't use the default /* If we want to test the http exporter we have to create it but disable it. We need to create it so we don't use the default
@ -84,83 +85,103 @@ public class OldMonitoringIndicesBackwardsCompatibilityTests extends AbstractOld
@Override @Override
protected void checkVersion(Version version) throws Exception { protected void checkVersion(Version version) throws Exception {
try { try {
checkVersionInternal(version); if (httpExporter) {
} finally { // If we're using the http exporter we need to update the port and enable it
/* Shut down monitoring after every test because we've shrunk the collection interval enough that we'll have trouble shutting NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().get();
* down cleanly unless we force monitoring to stop. */ TransportAddress publishAddress = nodeInfos.getNodes().get(0).getHttp().address().publishAddress();
internalCluster().getInstance(AgentService.class).stop(); InetSocketAddress address = publishAddress.address();
} Settings.Builder settings = Settings.builder();
} setupHttpExporter(settings, address.getPort());
private void checkVersionInternal(Version version) throws Exception { logger.info("--> Enabling http exporter pointing to [localhost:{}]", address.getPort());
if (version.before(Version.V_2_3_0)) { assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
// Slow down monitoring from its previously super fast pace so we can shut down without trouble }
Settings.Builder settings = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), timeValueSeconds(10).getStringRep()); // Monitoring can now start to collect new data
Settings.Builder settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), timeValueSeconds(3).getStringRep());
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get()); assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
/* We can't do anything with indexes created before 2.3 so we just assert that we didn't delete them or do anything otherwise // And we wait until data have been indexed locally using either by the local or http exporter
* crazy. */ MonitoringIndexNameResolver.Timestamped resolver = new IndexStatsResolver(MonitoredSystem.ES, Settings.EMPTY);
SearchResponse response = client().prepareSearch(".marvel-es-data").get(); MonitoringDoc monitoringDoc = new MonitoringDoc(MonitoredSystem.ES.getSystem(), randomAsciiOfLength(2));
// 2.0.x didn't index the nodes info monitoringDoc.setTimestamp(System.currentTimeMillis());
long expectedEsData = version.before(Version.V_2_1_0) ? 1 : 2; final String expectedIndex = resolver.index(monitoringDoc);
assertHitCount(response, expectedEsData);
response = client().prepareSearch(".marvel-es-*").get(); logger.info("--> {} Waiting for [{}] to be created", Thread.currentThread().getName(), expectedIndex);
assertThat(response.getHits().totalHits(), greaterThanOrEqualTo(20L)); assertBusy(() -> assertTrue(client().admin().indices().prepareExists(expectedIndex).get().isExists()));
return; assertBusy(() -> assertThat(client().prepareSearch(expectedIndex).setSize(0).get().getHits().getTotalHits(), greaterThan(1L)));
if (version.before(Version.V_2_3_0)) {
/* We can't do anything with indexes created before 2.3 so we just assert that we didn't delete them or do
anything otherwise crazy. */
SearchResponse response = client().prepareSearch(".marvel-es-data").get();
// 2.0.x didn't index the nodes info
long expectedEsData = version.before(Version.V_2_1_0) ? 1 : 2;
assertHitCount(response, expectedEsData);
response = client().prepareSearch(".marvel-es-*").get();
assertThat(response.getHits().totalHits(), greaterThanOrEqualTo(20L));
return;
}
/* Indexes created from 2.3 onwards get aliased to the place they'd be if they were created in 5.0 so queries should just work.
* Monitoring doesn't really have a Java API so we can't test that, but we can test that we write the data we expected to
* write. */
SearchResponse firstIndexStats = search(resolver, greaterThanOrEqualTo(10L));
// All the other aliases should have been created by now so we can assert that we have the data we saved in the bwc indexes
SearchResponse firstShards = search(new ShardsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(10L));
SearchResponse firstIndices = search(new IndicesStatsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(3L));
SearchResponse firstNode = search(new NodeStatsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(3L));
SearchResponse firstState = search(new ClusterStateResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(3L));
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
final String masterNodeId = clusterStateResponse.getState().getNodes().getMasterNodeId();
// Verify some stuff about the stuff in the backwards compatibility indexes
Arrays.stream(firstIndexStats.getHits().hits()).forEach(hit -> checkIndexStats(version, hit.sourceAsMap()));
Arrays.stream(firstShards.getHits().hits()).forEach(hit -> checkShards(version, hit.sourceAsMap()));
Arrays.stream(firstIndices.getHits().hits()).forEach(hit -> checkIndicesStats(version, hit.sourceAsMap()));
Arrays.stream(firstNode.getHits().hits()).forEach(hit -> checkNodeStats(version, masterNodeId, hit.sourceAsMap()));
Arrays.stream(firstState.getHits().hits()).forEach(hit -> checkClusterState(version, hit.sourceAsMap()));
// Wait for monitoring to accumulate some data about the current cluster
long indexStatsCount = firstIndexStats.getHits().totalHits();
assertBusy(() -> search(new IndexStatsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(indexStatsCount)), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new ShardsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstShards.getHits().totalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new IndicesStatsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstIndices.getHits().totalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new NodeStatsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstNode.getHits().totalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new ClusterStateResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstState.getHits().totalHits())), 1, TimeUnit.MINUTES);
} finally {
/* Now we stop monitoring and disable the HTTP exporter. We also delete all data and checks multiple times
if they have not been re created by some in flight monitoring bulk request */
internalCluster().getInstances(AgentService.class).forEach(AgentService::stop);
Settings.Builder settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1");
if (httpExporter) {
logger.info("--> Disabling http exporter after test");
setupHttpExporter(settings, null);
}
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
CountDown retries = new CountDown(5);
assertBusy(() -> {
String[] indices = new String[]{".marvel-*", ".monitoring-*"};
IndicesExistsResponse existsResponse = client().admin().indices().prepareExists(indices).get();
if (existsResponse.isExists()) {
assertAcked(client().admin().indices().prepareDelete(indices));
} else {
retries.countDown();
}
assertThat(retries.isCountedDown(), is(true));
});
} }
/* Indexes created from 2.3 onwards get aliased to the place they'd be if they were created in 5.0 so queries should just work.
* Monitoring doesn't really have a Java API so we can't test that, but we can test that we write the data we expected to write. */
if (httpExporter) {
// If we're using the http exporter we need feed it the port and enable it
NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().get();
TransportAddress publishAddress = nodeInfos.getNodes().get(0).getHttp().address().publishAddress();
InetSocketAddress address = publishAddress.address();
Settings.Builder settings = Settings.builder();
setupHttpExporter(settings, address.getPort());
logger.info("--> Enabling http exporter pointing to [localhost:{}]", address.getPort());
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get();
}
IndexStatsResolver resolver = new IndexStatsResolver(MonitoredSystem.ES, Settings.EMPTY);
logger.info("--> {} Waiting for [{}]", Thread.currentThread().getName(), resolver.indexPattern());
assertBusy(() -> assertTrue(client().admin().indices().prepareExists(resolver.indexPattern()).get().isExists()));
// Slow down monitoring from its previously super fast pace so we can shut down without trouble
Settings.Builder settings = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), timeValueSeconds(10).getStringRep());
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
SearchResponse firstIndexStats = search(resolver, greaterThanOrEqualTo(10L));
// All the other aliases should have been created by now so we can assert that we have the data we saved in the bwc indexes
SearchResponse firstShards = search(new ShardsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(10L));
SearchResponse firstIndicesStats = search(new IndicesStatsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(3L));
SearchResponse firstNodeStats = search(new NodeStatsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(3L));
SearchResponse firstClusterState = search(new ClusterStateResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(3L));
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
final String masterNodeId = clusterStateResponse.getState().getNodes().getMasterNodeId();
// Verify some stuff about the stuff in the backwards compatibility indexes
Arrays.stream(firstIndexStats.getHits().hits()).forEach(hit -> checkIndexStats(version, hit.sourceAsMap()));
Arrays.stream(firstShards.getHits().hits()).forEach(hit -> checkShards(version, hit.sourceAsMap()));
Arrays.stream(firstIndicesStats.getHits().hits()).forEach(hit -> checkIndicesStats(version, hit.sourceAsMap()));
Arrays.stream(firstNodeStats.getHits().hits()).forEach(hit -> checkNodeStats(version, masterNodeId, hit.sourceAsMap()));
Arrays.stream(firstClusterState.getHits().hits()).forEach(hit -> checkClusterState(version, hit.sourceAsMap()));
// Wait for monitoring to accumulate some data about the current cluster
long indexStatsCount = firstIndexStats.getHits().totalHits();
assertBusy(() -> search(new IndexStatsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(indexStatsCount)), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new ShardsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstShards.getHits().totalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new IndicesStatsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstIndicesStats.getHits().totalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new NodeStatsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstNodeStats.getHits().totalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new ClusterStateResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstClusterState.getHits().totalHits())), 1, TimeUnit.MINUTES);
} }
private SearchResponse search(MonitoringIndexNameResolver<?> resolver, Matcher<Long> hitCount) { private SearchResponse search(MonitoringIndexNameResolver<?> resolver, Matcher<Long> hitCount) {