[Monitoring] Add Rolling Upgrade Tests (elastic/x-pack-elasticsearch#2832)

This adds a rolling upgrade test for X-Pack monitoring. It works by using the `_xpack/monitoring/_bulk` endpoint to send arbitrary data, then verify that it exists.

This forces a few things to happen, thereby testing the behavior: 

1. The templates must exist.
2. The elected master node must be "ready" to work (hence the first
point).
3. The same "system_api_version" is accepted by every version of ES.

Original commit: elastic/x-pack-elasticsearch@012e5738bb
This commit is contained in:
Chris Earle 2017-11-09 12:49:37 -05:00 committed by GitHub
parent 4d265868b8
commit efb5b8827b
10 changed files with 231 additions and 494 deletions

View File

@ -129,11 +129,9 @@ public class HttpExporter extends Exporter {
public static final String PIPELINE_CHECK_TIMEOUT_SETTING = "index.pipeline.master_timeout"; public static final String PIPELINE_CHECK_TIMEOUT_SETTING = "index.pipeline.master_timeout";
/** /**
* Minimum supported version of the remote monitoring cluster. * Minimum supported version of the remote monitoring cluster (same major).
* <p>
* We must have support for the latest template syntax (index_patterns), which requires a minimum of 6.0.
*/ */
public static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_6_0_0_alpha1; public static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_7_0_0_alpha1;
/** /**
* The {@link RestClient} automatically pools connections and keeps them alive as necessary. * The {@link RestClient} automatically pools connections and keeps them alive as necessary.

View File

@ -1,195 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.license.PutLicenseAction;
import org.elasticsearch.license.PutLicenseRequest;
import org.elasticsearch.license.PutLicenseResponse;
import org.elasticsearch.license.TestUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.VersionUtils;
import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.elasticsearch.test.OldIndexUtils.copyIndex;
import static org.elasticsearch.test.OldIndexUtils.loadDataFilesList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
/**
* Base class for tests against clusters coming from old versions of xpack and Elasticsearch.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) // We'll start the nodes manually
public abstract class AbstractOldXPackIndicesBackwardsCompatibilityTestCase extends SecurityIntegTestCase {
/**
* Set to true when it is ok to start a node. We don't want to start nodes at unexpected times.
*/
private boolean okToStartNode = false;
private List<String> dataFiles;
@Override
protected final boolean ignoreExternalCluster() {
return true;
}
@Override
protected boolean shouldAssertXPackIsInstalled() {
return false; // Skip asserting that the xpack is installed because it tries to start the cluter.
}
@Override
protected void ensureClusterSizeConsistency() {
// We manage our nodes ourselves. At this point no node should be running anyway and this would start a new one!
}
@Override
protected void ensureClusterStateConsistency() throws IOException {
// We manage our nodes ourselves. At this point no node should be running anyway and this would start a new one!
}
@Before
public final void initIndexesList() throws Exception {
dataFiles = loadDataFilesList("x-pack", getBwcIndicesPath());
}
@Override
public Settings nodeSettings(int ord) {
if (false == okToStartNode) {
throw new IllegalStateException("Starting nodes must only happen in setupCluster");
}
// speed up recoveries
return Settings.builder()
.put(super.nodeSettings(ord))
.put(ThrottlingAllocationDecider
.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 30)
.put(ThrottlingAllocationDecider
.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 30)
.build();
}
@Override
protected int maxNumberOfNodes() {
try {
return SecurityIntegTestCase.defaultMaxNumberOfNodes() + loadDataFilesList("x-pack", getBwcIndicesPath()).size();
} catch (IOException e) {
throw new RuntimeException("couldn't enumerate bwc indices", e);
}
}
public void testAllVersionsTested() throws Exception {
SortedSet<String> expectedVersions = new TreeSet<>();
for (Version v : VersionUtils.allReleasedVersions()) {
if (v.isRelease()) {
// no guarantees for prereleases
expectedVersions.add("x-pack-" + v.toString() + ".zip");
}
}
expectedVersions.removeAll(dataFiles);
if (expectedVersions.isEmpty() == false) {
StringBuilder msg = new StringBuilder("Old index tests are missing indexes:");
for (String expected : expectedVersions) {
msg.append("\n" + expected);
}
fail(msg.toString());
}
}
public void testOldIndexes() throws Exception {
assertSecurityIndexWriteable();
Collections.shuffle(dataFiles, random());
for (String dataFile : dataFiles) {
Version version = Version.fromString(dataFile.replace("x-pack-", "").replace(".zip", ""));
long clusterStartTime = System.nanoTime();
setupCluster(dataFile);
ensureYellow();
long testStartTime = System.nanoTime();
try {
checkVersion(version);
} catch (Throwable t) {
throw new AssertionError("Failed while checking [" + version + "]", t);
}
logger.info("--> Done testing [{}]. Setting up cluster took [{}] millis and testing took [{}] millis", version,
TimeUnit.NANOSECONDS.toMillis(testStartTime - clusterStartTime),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - testStartTime));
}
}
/**
* Actually test this version.
*/
protected abstract void checkVersion(Version version) throws Exception;
private void setupCluster(String pathToZipFile) throws Exception {
// shutdown any nodes from previous zip files
while (internalCluster().size() > 0) {
internalCluster().stopRandomNode(s -> true);
}
// first create the data directory and unzip the data there
// we put the whole cluster state and indexes because if we only copy indexes and import them as dangling then
// the native realm services will start because there is no security index and nothing is recovering
// but we want them to not start!
Path dataPath = createTempDir();
Settings.Builder nodeSettings = Settings.builder()
.put("path.data", dataPath.toAbsolutePath());
// unzip data
Path backwardsIndex = getBwcIndicesPath().resolve(pathToZipFile);
// decompress the index
try (InputStream stream = Files.newInputStream(backwardsIndex)) {
logger.info("unzipping {}", backwardsIndex.toString());
TestUtil.unzip(stream, dataPath);
// now we need to copy the whole thing so that it looks like an actual data path
try (Stream<Path> unzippedFiles = Files.list(dataPath.resolve("data"))) {
Path dataDir = unzippedFiles.findFirst().get();
// this is not actually an index but the copy does the job anyway
int zipIndex = pathToZipFile.indexOf(".zip");
Version version = Version.fromString(pathToZipFile.substring("x-pack-".length(), zipIndex));
if (version.before(Version.V_5_0_0_alpha1)) {
// the bwc scripts packs the indices under this path before 5.0
dataDir = dataDir.resolve("nodes");
}
copyIndex(logger, dataDir, "nodes", dataPath);
// remove the original unzipped directory
}
IOUtils.rm(dataPath.resolve("data"));
}
// check it is unique
assertTrue(Files.exists(dataPath));
Path[] list = FileSystemUtils.files(dataPath);
if (list.length != 1) {
throw new IllegalStateException("Backwards index must contain exactly one node");
}
// start the node
logger.info("--> Data path for importing node: {}", dataPath);
okToStartNode = true;
String importingNodeName = internalCluster().startNode(nodeSettings.build());
okToStartNode = false;
Path[] nodePaths = internalCluster().getInstance(NodeEnvironment.class, importingNodeName).nodeDataPaths();
assertEquals(1, nodePaths.length);
PutLicenseResponse putLicenseResponse = client().execute(PutLicenseAction.INSTANCE,
new PutLicenseRequest().license(TestUtils.generateSignedLicense("platinum", TimeValue.timeValueHours(24L)))).get();
assertAcked(putLicenseResponse);
}
}

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.test; package org.elasticsearch.test;
import org.elasticsearch.AbstractOldXPackIndicesBackwardsCompatibilityTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@ -183,9 +182,6 @@ public abstract class SecurityIntegTestCase extends ESIntegTestCase {
@Before @Before
//before methods from the superclass are run before this, which means that the current cluster is ready to go //before methods from the superclass are run before this, which means that the current cluster is ready to go
public void assertXPackIsInstalled() { public void assertXPackIsInstalled() {
if (false == shouldAssertXPackIsInstalled()) {
return;
}
NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().clear().setPlugins(true).get(); NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().clear().setPlugins(true).get();
for (NodeInfo nodeInfo : nodeInfos.getNodes()) { for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
// TODO: disable this assertion for now, due to random runs with mock plugins. perhaps run without mock plugins? // TODO: disable this assertion for now, due to random runs with mock plugins. perhaps run without mock plugins?
@ -197,17 +193,6 @@ public abstract class SecurityIntegTestCase extends ESIntegTestCase {
} }
} }
/**
* Should this test assert that x-pack is installed? You might want to skip this assertion if the test itself validates the installation
* <strong>and</strong> running the assertion would significantly affect the performance or function of the test. For example
* {@link AbstractOldXPackIndicesBackwardsCompatibilityTestCase} disables the assertion because the assertion would force starting a
* node which it will then just shut down. That would slow the test down significantly and causes spurious failures because watcher
* needs to be shut down with kid gloves.
*/
protected boolean shouldAssertXPackIsInstalled() {
return true;
}
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)) Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal))

View File

@ -1,280 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.AbstractOldXPackIndicesBackwardsCompatibilityTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.indices.IndicesStatsMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.node.NodeStatsMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.shards.ShardMonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
import org.hamcrest.Matcher;
import org.joda.time.format.DateTimeFormat;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
/**
* Tests for monitoring indexes created before {@link Version#CURRENT}.
*/
//Give ourselves 30 seconds instead of 5 to shut down. Sometimes it takes a while, especially on weak hardware. But we do get there.
@ThreadLeakLingering(linger = 30000)
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/4314")
public class OldMonitoringIndicesBackwardsCompatibilityTests extends AbstractOldXPackIndicesBackwardsCompatibilityTestCase {
private final boolean httpExporter = randomBoolean();
@Override
public Settings nodeSettings(int ord) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(ord))
.put(XPackSettings.MONITORING_ENABLED.getKey(), true)
// Don't clean old monitoring indexes - we want to make sure we can load them
.put(Monitoring.HISTORY_DURATION.getKey(), TimeValue.timeValueHours(1000 * 365 * 24).getStringRep())
// Do not start monitoring exporters at startup
.put(MonitoringService.INTERVAL.getKey(), "-1");
if (httpExporter) {
/* If we want to test the http exporter we have to create it but disable it. We need to create it so we don't use the default
* local exporter and we have to disable it because we don't yet know the port we've bound to. We can only get that once
* Elasticsearch starts so we'll enable the exporter then. */
settings.put(NetworkModule.HTTP_ENABLED.getKey(), true);
setupHttpExporter(settings, null);
}
return settings.build();
}
private void setupHttpExporter(Settings.Builder settings, Integer port) {
Map<String, String> httpExporter = new HashMap<>();
httpExporter.put("type", "http");
httpExporter.put("enabled", port == null ? "false" : "true");
httpExporter.put("host", "http://localhost:" + (port == null ? "does_not_matter" : port));
httpExporter.put("auth.username", SecuritySettingsSource.TEST_USER_NAME);
httpExporter.put("auth.password", SecuritySettingsSource.TEST_PASSWORD);
settings.putProperties(httpExporter, k -> Exporters.EXPORTERS_SETTINGS.getKey() + "my_exporter." + k);
}
@Override
protected void checkVersion(Version version) throws Exception {
try {
logger.info("--> Start testing version [{}]", version);
if (httpExporter) {
// If we're using the http exporter we need to update the port and enable it
NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().get();
TransportAddress publishAddress = nodeInfos.getNodes().get(0).getHttp().address().publishAddress();
InetSocketAddress address = publishAddress.address();
Settings.Builder settings = Settings.builder();
setupHttpExporter(settings, address.getPort());
logger.info("--> Enabling http exporter pointing to [localhost:{}]", address.getPort());
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
}
// Monitoring can now start to collect new data
Settings.Builder settings = Settings.builder().put(MonitoringService.INTERVAL.getKey(), timeValueSeconds(3).getStringRep());
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
final String prefix = ".monitoring-" + MonitoredSystem.ES.getSystem() + "-" + TEMPLATE_VERSION + "-";
// And we wait until data have been indexed locally using either by the local or http exporter
final String dateTime = DateTimeFormat.forPattern("YYYY.MM.dd").print(System.currentTimeMillis());
final String expectedIndex = prefix + dateTime;
final String indexPattern = prefix + "*";
logger.info("--> {} Waiting for [{}] to be ready", Thread.currentThread().getName(), expectedIndex);
assertBusy(() -> {
assertTrue(client().admin().indices().prepareExists(expectedIndex).get().isExists());
NumShards numShards = getNumShards(expectedIndex);
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth(expectedIndex)
.setWaitForActiveShards(numShards.numPrimaries)
.get();
assertThat(clusterHealth.getIndices().get(expectedIndex).getActivePrimaryShards(), equalTo(numShards.numPrimaries));
});
SearchResponse firstIndexStats =
search(indexPattern, IndexStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(10L));
// All the other aliases should have been created by now so we can assert that we have the data we saved in the bwc indexes
SearchResponse firstShards = search(indexPattern, ShardMonitoringDoc.TYPE, greaterThanOrEqualTo(10L));
SearchResponse firstIndices = search(indexPattern, IndicesStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(3L));
SearchResponse firstNode = search(indexPattern, NodeStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(3L));
SearchResponse firstState = search(indexPattern, ClusterStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(3L));
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
final String masterNodeId = clusterStateResponse.getState().getNodes().getMasterNodeId();
// Verify some stuff about the stuff in the backwards compatibility indexes
Arrays.stream(firstIndexStats.getHits().getHits()).forEach(hit -> checkIndexStats(version, hit.getSourceAsMap()));
Arrays.stream(firstShards.getHits().getHits()).forEach(hit -> checkShards(version, hit.getSourceAsMap()));
Arrays.stream(firstIndices.getHits().getHits()).forEach(hit -> checkIndicesStats(version, hit.getSourceAsMap()));
Arrays.stream(firstNode.getHits().getHits()).forEach(hit -> checkNodeStats(version, masterNodeId, hit.getSourceAsMap()));
Arrays.stream(firstState.getHits().getHits()).forEach(hit -> checkClusterState(version, hit.getSourceAsMap()));
// Create some docs
indexRandom(true, client().prepareIndex("test-1", "doc", "1").setSource("field", 1),
client().prepareIndex("test-2", "doc", "2").setSource("field", 2));
// Wait for monitoring to accumulate some data about the current cluster
long indexStatsCount = firstIndexStats.getHits().getTotalHits();
assertBusy(() -> search(indexPattern, IndexStatsMonitoringDoc.TYPE,
greaterThan(indexStatsCount)), 1, TimeUnit.MINUTES);
assertBusy(() -> search(indexPattern, ShardMonitoringDoc.TYPE,
greaterThan(firstShards.getHits().getTotalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(indexPattern, IndicesStatsMonitoringDoc.TYPE,
greaterThan(firstIndices.getHits().getTotalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(indexPattern, NodeStatsMonitoringDoc.TYPE,
greaterThan(firstNode.getHits().getTotalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(indexPattern, ClusterStatsMonitoringDoc.TYPE,
greaterThan(firstState.getHits().getTotalHits())), 1, TimeUnit.MINUTES);
} finally {
/* Now we stop monitoring and disable the HTTP exporter. We also delete all data and checks multiple times
if they have not been re created by some in flight monitoring bulk request */
internalCluster().getInstances(MonitoringService.class).forEach(MonitoringService::stop);
Settings.Builder settings = Settings.builder().put(MonitoringService.INTERVAL.getKey(), "-1");
if (httpExporter) {
logger.info("--> Disabling http exporter after test");
setupHttpExporter(settings, null);
}
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
logger.info("--> Waiting for indices deletion");
CountDown retries = new CountDown(10);
assertBusy(() -> {
String[] indices = new String[]{".monitoring-*"};
IndicesExistsResponse existsResponse = client().admin().indices().prepareExists(indices).get();
if (existsResponse.isExists()) {
assertAcked(client().admin().indices().prepareDelete(indices));
} else {
retries.countDown();
}
assertThat(retries.isCountedDown(), is(true));
});
logger.info("--> End testing version [{}]", version);
}
}
private SearchResponse search(String indexPattern, String type, Matcher<Long> hitCount) {
SearchResponse response = client().prepareSearch(indexPattern).setTypes(type).get();
assertThat(response.getHits().getTotalHits(), hitCount);
return response;
}
private void checkIndexStats(final Version version, Map<String, Object> indexStats) {
checkMonitoringElement(indexStats);
checkSourceNode(version, indexStats);
Map<?, ?> stats = (Map<?, ?>) indexStats.get("index_stats");
assertThat(stats, hasKey("index"));
Map<?, ?> total = (Map<?, ?>) stats.get("total");
Map<?, ?> docs = (Map<?, ?>) total.get("docs");
// These might have been taken before all the documents were added so we can't assert a whole lot about the number
assertThat((Integer) docs.get("count"), greaterThanOrEqualTo(0));
}
private void checkShards(final Version version, Map<String, Object> shards) {
checkMonitoringElement(shards);
Map<?, ?> shard = (Map<?, ?>) shards.get("shard");
assertThat(shard, allOf(hasKey("index"), hasKey("state"), hasKey("primary"), hasKey("node")));
}
private void checkIndicesStats(final Version version, Map<String, Object> indicesStats) {
checkMonitoringElement(indicesStats);
checkSourceNode(version, indicesStats);
Map<?, ?> stats = (Map<?, ?>) indicesStats.get("indices_stats");
Map<?, ?> all = (Map<?, ?>) stats.get("_all");
Map<?, ?> primaries = (Map<?, ?>) all.get("primaries");
Map<?, ?> docs = (Map<?, ?>) primaries.get("docs");
// These might have been taken before all the documents were added so we can't assert a whole lot about the number
assertThat((Integer) docs.get("count"), greaterThanOrEqualTo(0));
}
private void checkNodeStats(final Version version, final String masterNodeId, Map<String, Object> nodeStats) {
checkMonitoringElement(nodeStats);
checkSourceNode(version, nodeStats);
Map<?, ?> stats = (Map<?, ?>) nodeStats.get("node_stats");
// Those fields are expected in every node stats documents
Set<String> mandatoryKeys = new HashSet<>();
mandatoryKeys.add("node_id");
mandatoryKeys.add("node_master");
mandatoryKeys.add("mlockall");
mandatoryKeys.add("indices");
mandatoryKeys.add("os");
mandatoryKeys.add("fs");
mandatoryKeys.add("process");
mandatoryKeys.add("jvm");
mandatoryKeys.add("thread_pool");
// disk_threshold_* fields have been removed in 5.0 alpha5, we only check for them if the
// current tested version is less than or equal to alpha4. Also, the current master node
// might have collected its own node stats through the Monitoring plugin, and since it is
// running under Version.CURRENT there's no chance to find these fields.
if (version.onOrBefore(Version.V_5_0_0_alpha4)) {
if (masterNodeId.equals((String) stats.get("node_id")) == false) {
mandatoryKeys.add("disk_threshold_enabled");
mandatoryKeys.add("disk_threshold_watermark_high");
}
}
for (String key : mandatoryKeys) {
assertThat("Expecting [" + key + "] to be present for bwc index in version [" + version + "]", stats, hasKey(key));
}
Set<?> keys = new HashSet<>(stats.keySet());
keys.removeAll(mandatoryKeys);
assertTrue("Found unexpected fields [" + Strings.collectionToCommaDelimitedString(keys) + "] " +
"for bwc index in version [" + version + "]", keys.isEmpty());
}
private void checkClusterState(final Version version, Map<String, Object> clusterState) {
checkMonitoringElement(clusterState);
checkSourceNode(version, clusterState);
Map<?, ?> stats = (Map<?, ?>) clusterState.get("cluster_state");
assertThat(stats, allOf(hasKey("status"), hasKey("version"), hasKey("state_uuid"), hasKey("master_node"), hasKey("nodes")));
}
private void checkMonitoringElement(Map<String, Object> element) {
assertThat(element, allOf(hasKey("cluster_uuid"), hasKey("timestamp")));
}
private void checkSourceNode(final Version version, Map<String, Object> element) {
assertThat(element, hasKey("source_node"));
}
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.StreamsUtils; import org.elasticsearch.test.StreamsUtils;
import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.common.text.TextTemplate; import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.security.SecurityClusterClientYamlTestCase; import org.elasticsearch.xpack.security.SecurityClusterClientYamlTestCase;
import org.elasticsearch.xpack.security.support.IndexLifecycleManager; import org.elasticsearch.xpack.security.support.IndexLifecycleManager;
import org.elasticsearch.xpack.test.rest.XPackRestTestCase; import org.elasticsearch.xpack.test.rest.XPackRestTestCase;
@ -175,6 +176,22 @@ public class FullClusterRestartIT extends ESRestTestCase {
} }
} }
@SuppressWarnings("unchecked")
public void testMonitoring() throws Exception {
waitForYellow(".monitoring-es-*");
if (runningAgainstOldCluster == false) {
waitForMonitoringTemplates();
}
// ensure that monitoring [re]starts and creates the core monitoring document, cluster_stats, for the current cluster
final Map<String, Object> response = toMap(client().performRequest("GET", "/"));
final Map<String, Object> version = (Map<String, Object>) response.get("version");
final String expectedVersion = (String) version.get("number");
waitForClusterStats(expectedVersion);
}
public void testWatcher() throws Exception { public void testWatcher() throws Exception {
if (runningAgainstOldCluster) { if (runningAgainstOldCluster) {
logger.info("Adding a watch on old cluster"); logger.info("Adding a watch on old cluster");
@ -364,6 +381,43 @@ public class FullClusterRestartIT extends ESRestTestCase {
}, 30, TimeUnit.SECONDS); }, 30, TimeUnit.SECONDS);
} }
@SuppressWarnings("unchecked")
private void waitForMonitoringTemplates() throws Exception {
assertBusy(() -> {
final Map<String, Object> templates = toMap(client().performRequest("GET", "/_template/.monitoring-*"));
// in earlier versions, we published legacy templates in addition to the current ones to support transitioning
assertThat(templates.size(), greaterThanOrEqualTo(MonitoringTemplateUtils.TEMPLATE_IDS.length));
// every template should be updated to whatever the current version is
for (final String templateId : MonitoringTemplateUtils.TEMPLATE_IDS) {
final String templateName = MonitoringTemplateUtils.templateName(templateId);
final Map<String, Object> template = (Map<String, Object>) templates.get(templateName);
assertThat(template.get("version"), is(MonitoringTemplateUtils.LAST_UPDATED_VERSION));
}
}, 30, TimeUnit.SECONDS);
}
@SuppressWarnings("unchecked")
private void waitForClusterStats(final String expectedVersion) throws Exception {
assertBusy(() -> {
final Map<String, String> params = new HashMap<>(3);
params.put("q", "type:cluster_stats");
params.put("size", "1");
params.put("sort", "timestamp:desc");
final Map<String, Object> response = toMap(client().performRequest("GET", "/.monitoring-es-*/_search", params));
final Map<String, Object> hits = (Map<String, Object>) response.get("hits");
assertThat("No cluster_stats documents found.", (int)hits.get("total"), greaterThanOrEqualTo(1));
final Map<String, Object> hit = (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0);
final Map<String, Object> source = (Map<String, Object>) hit.get("_source");
assertThat(source.get("version"), is(expectedVersion));
}, 30, TimeUnit.SECONDS);
}
static Map<String, Object> toMap(Response response) throws IOException { static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity())); return toMap(EntityUtils.toString(response.getEntity()));
} }

View File

@ -122,6 +122,11 @@ subprojects {
minimumMasterNodes = { 2 } minimumMasterNodes = { 2 }
clusterName = 'rolling-upgrade' clusterName = 'rolling-upgrade'
waitCondition = waitWithAuth waitCondition = waitWithAuth
setting 'xpack.monitoring.collection.interval', '-1'
setting 'xpack.monitoring.exporters._http.type', 'http'
setting 'xpack.monitoring.exporters._http.enabled', 'false'
setting 'xpack.monitoring.exporters._http.auth.username', 'test_user'
setting 'xpack.monitoring.exporters._http.auth.password', 'x-pack-test-password'
setting 'xpack.security.transport.ssl.enabled', 'true' setting 'xpack.security.transport.ssl.enabled', 'true'
setting 'xpack.security.authc.token.enabled', 'true' setting 'xpack.security.authc.token.enabled', 'true'
setting 'xpack.ssl.keystore.path', 'testnode.jks' setting 'xpack.ssl.keystore.path', 'testnode.jks'
@ -161,6 +166,11 @@ subprojects {
minimumMasterNodes = { 2 } minimumMasterNodes = { 2 }
dataDir = { nodeNumber -> oldClusterTest.nodes[1].dataDir } dataDir = { nodeNumber -> oldClusterTest.nodes[1].dataDir }
waitCondition = waitWithAuth waitCondition = waitWithAuth
setting 'xpack.monitoring.collection.interval', '-1'
setting 'xpack.monitoring.exporters._http.type', 'http'
setting 'xpack.monitoring.exporters._http.enabled', 'false'
setting 'xpack.monitoring.exporters._http.auth.username', 'test_user'
setting 'xpack.monitoring.exporters._http.auth.password', 'x-pack-test-password'
setting 'xpack.security.transport.ssl.enabled', 'true' setting 'xpack.security.transport.ssl.enabled', 'true'
setting 'xpack.ssl.keystore.path', 'testnode.jks' setting 'xpack.ssl.keystore.path', 'testnode.jks'
keystoreSetting 'xpack.ssl.keystore.secure_password', 'testnode' keystoreSetting 'xpack.ssl.keystore.secure_password', 'testnode'
@ -194,6 +204,11 @@ subprojects {
minimumMasterNodes = { 2 } minimumMasterNodes = { 2 }
dataDir = { nodeNumber -> oldClusterTest.nodes[0].dataDir } dataDir = { nodeNumber -> oldClusterTest.nodes[0].dataDir }
waitCondition = waitWithAuth waitCondition = waitWithAuth
setting 'xpack.monitoring.collection.interval', '-1'
setting 'xpack.monitoring.exporters._http.type', 'http'
setting 'xpack.monitoring.exporters._http.enabled', 'false'
setting 'xpack.monitoring.exporters._http.auth.username', 'test_user'
setting 'xpack.monitoring.exporters._http.auth.password', 'x-pack-test-password'
setting 'xpack.security.transport.ssl.enabled', 'true' setting 'xpack.security.transport.ssl.enabled', 'true'
setting 'xpack.ssl.keystore.path', 'testnode.jks' setting 'xpack.ssl.keystore.path', 'testnode.jks'
keystoreSetting 'xpack.ssl.keystore.secure_password', 'testnode' keystoreSetting 'xpack.ssl.keystore.secure_password', 'testnode'

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ClientYamlTestResponse;
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry; import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
import org.elasticsearch.xpack.security.SecurityClusterClientYamlTestCase; import org.elasticsearch.xpack.security.SecurityClusterClientYamlTestCase;
import org.elasticsearch.xpack.test.rest.XPackRestTestCase; import org.elasticsearch.xpack.test.rest.XPackRestTestCase;
@ -20,6 +21,13 @@ import org.junit.Before;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Base64; import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.is;
@TimeoutSuite(millis = 5 * TimeUnits.MINUTE) // to account for slow as hell VMs @TimeoutSuite(millis = 5 * TimeUnits.MINUTE) // to account for slow as hell VMs
public class UpgradeClusterClientYamlTestSuiteIT extends SecurityClusterClientYamlTestCase { public class UpgradeClusterClientYamlTestSuiteIT extends SecurityClusterClientYamlTestCase {
@ -32,6 +40,34 @@ public class UpgradeClusterClientYamlTestSuiteIT extends SecurityClusterClientYa
XPackRestTestCase.waitForMlTemplates(); XPackRestTestCase.waitForMlTemplates();
} }
/**
* Enables an HTTP exporter for monitoring so that we can test the production-level exporter (not the local exporter).
*
* The build.gradle file disables data collection, so the expectation is that any monitoring rest tests will use the
* "_xpack/monitoring/_bulk" endpoint to lazily setup the templates on-demand and fill in data without worrying about
* timing.
*/
@Before
public void waitForMonitoring() throws Exception {
final String[] nodes = System.getProperty("tests.rest.cluster").split(",");
final Map<String, Object> settings = new HashMap<>();
settings.put("xpack.monitoring.exporters._http.enabled", true);
// only select the last node to avoid getting the "old" node in a mixed cluster
// if we ever randomize the order that the nodes are restarted (or add more nodes), then we need to verify which node we select
settings.put("xpack.monitoring.exporters._http.host", nodes[nodes.length - 1]);
assertBusy(() -> {
final ClientYamlTestResponse response =
getAdminExecutionContext().callApi("cluster.put_settings",
emptyMap(),
singletonList(singletonMap("transient", settings)),
emptyMap());
assertThat(response.evaluate("acknowledged"), is(true));
});
}
@Override @Override
protected boolean preserveIndicesUponCompletion() { protected boolean preserveIndicesUponCompletion() {
return true; return true;

View File

@ -0,0 +1,44 @@
---
setup:
- do:
cluster.health:
wait_for_status: yellow
---
"Index monitoring data and search on the mixed cluster":
- do:
search:
index: .monitoring-kibana-*
body: { "query": { "term" : { "type": "old_cluster" } } }
- match: { hits.total: 2 }
- do:
xpack.monitoring.bulk:
system_id: "kibana"
system_api_version: "6"
interval: "123456ms"
type: "mixed_cluster"
body:
- '{"index": {}}'
- '{"field": "value_3"}'
- '{"index": {}}'
- '{"field": "value_4"}'
- '{"index": {}}'
- '{"field": "value_5"}'
- is_false: errors
- do:
indices.refresh: {}
- do:
search:
index: .monitoring-kibana-*
body: { "query": { "term" : { "type": "old_cluster" } } }
- match: { hits.total: 2 }
- do:
search:
index: .monitoring-kibana-*
body: { "query": { "term" : { "type": "mixed_cluster" } } }
- match: { hits.total: 3 }

View File

@ -0,0 +1,30 @@
---
setup:
- do:
cluster.health:
wait_for_status: yellow
---
"Index monitoring data and search on the old cluster":
- do:
xpack.monitoring.bulk:
system_id: "kibana"
system_api_version: "6"
interval: "123456ms"
type: "old_cluster"
body:
- '{"index": {}}'
- '{"field": "value_1"}'
- '{"index": {}}'
- '{"field": "value_2"}'
- is_false: errors
- do:
indices.refresh: {}
- do:
search:
index: .monitoring-kibana-*
body: { "query": { "term" : { "type": "old_cluster" } } }
- match: { hits.total: 2 }

View File

@ -0,0 +1,50 @@
---
setup:
- do:
cluster.health:
wait_for_status: yellow
---
"Index monitoring data and search on the upgraded cluster":
- do:
search:
index: .monitoring-kibana-*
body: { "query": { "term" : { "type": "old_cluster" } } }
- match: { hits.total: 2 }
- do:
search:
index: .monitoring-kibana-*
body: { "query": { "term" : { "type": "mixed_cluster" } } }
- match: { hits.total: 3 }
- do:
xpack.monitoring.bulk:
system_id: "kibana"
system_api_version: "6"
interval: "123456ms"
type: "upgraded_cluster"
body:
- '{"index": {}}'
- '{"field": "value_6"}'
- '{"index": {}}'
- '{"field": "value_7"}'
- '{"index": {}}'
- '{"field": "value_8"}'
- is_false: errors
- do:
indices.refresh: {}
- do:
search:
index: .monitoring-kibana-*
body: { "query": { "terms" : { "type": [ "old_cluster", "mixed_cluster" ] } } }
- match: { hits.total: 5 }
- do:
search:
index: .monitoring-kibana-*
body: { "query": { "term" : { "type": "upgraded_cluster" } } }
- match: { hits.total: 3 }