Add backwards compatibility support to monitoring

1. We only support indexes created by Marvel 2.3+. All other indexes
are just ignored.
2. The tests don't assert a ton of interesting stuff because there
isn't a java API for Monitoring that we can just use. Instead we assert
that a few objects are there and look sane.
3. We don't migrate the contents of the data index. Instead we just
rely on Monitoring recreating it.

Original commit: elastic/x-pack-elasticsearch@86216c2d61
This commit is contained in:
Nik Everett 2016-09-06 14:30:35 -04:00
parent a6d55f26c6
commit c21a922778
27 changed files with 595 additions and 89 deletions

View File

@ -293,15 +293,7 @@ def generate_watcher_index(client, version):
# wait to accumulate some watches
logging.info('Waiting for watch results index to fill up...')
for attempt in range(1, 31):
try:
response = client.search(index="bwc_watch_index", body={"query": {"match_all": {}}})
logging.info('(' + str(attempt) + ') Got ' + str(response['hits']['total']) + ' hits and want 10...')
if response['hits']['total'] >= 10:
break
except NotFoundError:
logging.info('(' + str(attempt) + ') Not found, retrying')
time.sleep(1)
wait_for_search(10, lambda: client.search(index="bwc_watch_index", body={"query": {"match_all": {}}}))
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='.watches')
assert health['timed_out'] == False, 'cluster health timed out %s' % health
@ -310,6 +302,35 @@ def generate_watcher_index(client, version):
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='bwc_watch_index')
assert health['timed_out'] == False, 'cluster health timed out %s' % health
def wait_for_monitoring_index_to_fill(client, version):
logging.info('Waiting for marvel to index the cluster_info...')
wait_for_search(1, lambda: client.search(index=".marvel-es-*", doc_type="cluster_info", body={"query": {"match_all": {}}}))
if parse_version(version) >= parse_version('2.1.0'):
logging.info('Waiting for marvel to index the node information...')
wait_for_search(1, lambda: client.search(index=".marvel-es-*", doc_type="node", body={"query": {"match_all": {}}}))
logging.info('Waiting for marvel index to get enough index_stats...')
wait_for_search(10, lambda: client.search(index=".marvel-es-*", doc_type="index_stats", body={"query": {"match_all": {}}}))
logging.info('Waiting for marvel index to get enough shards...')
wait_for_search(10, lambda: client.search(index=".marvel-es-*", doc_type="shards", body={"query": {"match_all": {}}}))
logging.info('Waiting for marvel index to get enough indices_stats...')
wait_for_search(3, lambda: client.search(index=".marvel-es-*", doc_type="indices_stats", body={"query": {"match_all": {}}}))
logging.info('Waiting for marvel index to get enough node_stats...')
wait_for_search(3, lambda: client.search(index=".marvel-es-*", doc_type="node_stats", body={"query": {"match_all": {}}}))
logging.info('Waiting for marvel index to get enough cluster_state...')
wait_for_search(3, lambda: client.search(index=".marvel-es-*", doc_type="cluster_state", body={"query": {"match_all": {}}}))
def wait_for_search(required_count, searcher):
for attempt in range(1, 31):
try:
response = searcher()
logging.info('(' + str(attempt) + ') Got ' + str(response['hits']['total']) + ' hits and want ' + str(required_count) + '...')
if response['hits']['total'] >= required_count:
return
except NotFoundError:
logging.info('(' + str(attempt) + ') Not found, retrying')
time.sleep(1)
logger.error("Ran out of retries")
raise "Ran out of retries"
def compress_index(version, tmp_dir, output_dir):
compress(tmp_dir, output_dir, 'x-pack-%s.zip' % version, 'data')
@ -402,16 +423,18 @@ def main():
try:
# install plugins
remove_plugin(version, release_dir, 'license')
remove_plugin(version, release_dir, 'shield')
# Remove old plugins just in case any are around
remove_plugin(version, release_dir, 'marvel-agent')
remove_plugin(version, release_dir, 'watcher')
# remove the shield config too before fresh install
remove_plugin(version, release_dir, 'shield')
remove_plugin(version, release_dir, 'license')
# Remove the shield config too before fresh install
run('rm -rf %s' %(os.path.join(release_dir, 'config/shield')))
# Install the plugins we'll need
install_plugin(version, release_dir, 'license')
install_plugin(version, release_dir, 'shield')
install_plugin(version, release_dir, 'watcher')
# here we could also install watcher etc
install_plugin(version, release_dir, 'marvel-agent')
# create admin
run('%s useradd es_admin -r admin -p 0123456789' %(os.path.join(release_dir, 'bin/shield/esusers')))
@ -424,7 +447,7 @@ def main():
else:
generate_security_index(client, version)
generate_watcher_index(client, version)
# here we could also add watches, monitoring etc
wait_for_monitoring_index_to_fill(client, version)
shutdown_node(node)
node = None

View File

@ -133,7 +133,7 @@ public abstract class ExportBulk {
bulk.flush();
} catch (ExportException e) {
if (exception == null) {
exception = new ExportException("failed to flush export bulks");
exception = new ExportException("failed to flush export bulks", e);
}
exception.addExportException(e);
}

View File

@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
/**
* Creates aliases for monitoring indexes created by Marvel 2.3+.
*/
public class BackwardsCompatibilityAliasesResource extends HttpResource {
private static final Logger logger = Loggers.getLogger(BackwardsCompatibilityAliasesResource.class);
private final TimeValue masterTimeout;
/**
* Create a new {@link TemplateHttpResource}.
*
* @param resourceOwnerName The user-recognizable name.
* @param masterTimeout Master timeout to use with any request.
*/
public BackwardsCompatibilityAliasesResource(final String resourceOwnerName, @Nullable final TimeValue masterTimeout) {
super(resourceOwnerName);
this.masterTimeout = masterTimeout;
}
@Override
protected boolean doCheckAndPublish(RestClient client) {
boolean needNewAliases = false;
XContentBuilder request;
try {
Map<String, String> params = parameters();
params.put("filter_path", "*.aliases");
Response response = client.performRequest("GET", "/.marvel-es-1-*", params);
try (XContentParser parser = JsonXContent.jsonXContent.createParser(response.getEntity().getContent())) {
Map<String, Object> indices = parser.map();
request = JsonXContent.contentBuilder();
request.startObject().startArray("actions");
for (Map.Entry<String, Object> e : indices.entrySet()) {
String index = e.getKey();
String alias = ".monitoring-es-2-" + index.substring(".marvel-es-1-".length());
if (false == aliasesForIndex(e.getValue()).contains(alias)) {
needNewAliases = true;
addAlias(request, index, alias);
}
}
request.endArray().endObject();
}
} catch (ResponseException e) {
int statusCode = e.getResponse().getStatusLine().getStatusCode();
if (statusCode == RestStatus.NOT_FOUND.getStatus()) {
logger.debug("no 2.x monitoring indexes found so no need to create backwards compatibility aliases");
return true;
}
logger.error((Supplier<?>) () ->
new ParameterizedMessage("failed to check for 2.x monitoring indexes with [{}]", statusCode),
e);
return false;
} catch (IOException | RuntimeException e) {
logger.error("failed to check for 2.x monitoring indexes", e);
return false;
}
if (false == needNewAliases) {
// Hurray! Nothing to do!
return true;
}
/* Looks like we have to create some new aliases. Note that this is a race with all other exporters on other nodes of Elasticsearch
* targeting this cluster. That is fine because this request is idemopotent, meaning that if it has no work to do it'll just return
* 200 OK { "acknowledged": true }. */
try {
BytesRef bytes = request.bytes().toBytesRef();
HttpEntity body = new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON);
Response response = client.performRequest("POST", "/_aliases", parameters(), body);
try (XContentParser parser = JsonXContent.jsonXContent.createParser(response.getEntity().getContent())) {
Map<String, Object> aliasesResponse = parser.map();
Boolean acked = (Boolean) aliasesResponse.get("acknowledged");
if (acked == null) {
logger.error("Unexpected response format from _aliases action {}", aliasesResponse);
return false;
}
return acked;
}
} catch (IOException | RuntimeException e) {
logger.error("failed to create aliases for 2.x monitoring indexes", e);
return false;
}
}
private Set<?> aliasesForIndex(Object indexInfo) {
Map<?, ?> info = (Map<?, ?>) indexInfo;
Map<?, ?> aliases = (Map<?, ?>) info.get("aliases");
return aliases.keySet();
}
/**
* Parameters to use for all requests.
*/
Map<String, String> parameters() {
Map<String, String> parameters = new HashMap<>();
if (masterTimeout != null) {
parameters.put("master_timeout", masterTimeout.getStringRep());
}
return parameters;
}
private void addAlias(XContentBuilder request, String index, String alias) throws IOException {
request.startObject().startObject("add");
{
request.field("index", index);
request.field("alias", alias);
}
request.endObject().endObject();
}
}

View File

@ -36,6 +36,9 @@ import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
import org.elasticsearch.xpack.ssl.SSLService;
import javax.net.ssl.SSLContext;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -121,6 +124,10 @@ public class HttpExporter extends Exporter {
* ES level timeout used when checking and writing pipelines (used to speed up tests)
*/
public static final String PIPELINE_CHECK_TIMEOUT_SETTING = "index.pipeline.master_timeout";
/**
* ES level timeout used when checking and writing aliases (used to speed up tests)
*/
public static final String ALIAS_TIMEOUT_SETTING = "index.alias.master_timeout";
/**
* Minimum supported version of the remote monitoring cluster.
@ -306,6 +313,8 @@ public class HttpExporter extends Exporter {
configureTemplateResources(config, resolvers, resourceOwnerName, resources);
// load the pipeline (this will get added to as the monitoring API version increases)
configurePipelineResources(config, resourceOwnerName, resources);
resources.add(new BackwardsCompatibilityAliasesResource(resourceOwnerName,
config.settings().getAsTime(ALIAS_TIMEOUT_SETTING, timeValueSeconds(30))));
return new MultiHttpResource(resourceOwnerName, resources);
}
@ -570,5 +579,4 @@ public class HttpExporter extends Exporter {
}
}
}
}

View File

@ -12,6 +12,10 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
@ -32,12 +36,12 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.exporter.ExportBulk;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.security.InternalClient;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -149,9 +153,15 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
return null;
}
if (null != prepareAddAliasesTo2xIndices(clusterState)) {
logger.debug("old monitoring indexes exist without aliases, waiting for them to get new aliases");
return null;
}
logger.trace("monitoring index templates and pipelines are installed, service can start");
} else {
// TODO we really shouldn't continually attempt to put the resources on every cluster state change. We should be patient.
// we are on the elected master
//
@ -184,6 +194,29 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
logger.trace("pipeline [{}] found", EXPORT_PIPELINE_NAME);
}
IndicesAliasesRequest addAliasesTo2xIndices = prepareAddAliasesTo2xIndices(clusterState);
if (addAliasesTo2xIndices == null) {
logger.trace("there are no 2.x monitoring indices or they have all the aliases they need");
} else {
logger.debug("there are 2.x monitoring indices and they are missing some aliases to make them compatible with 5.x");
client.execute(IndicesAliasesAction.INSTANCE, addAliasesTo2xIndices, new ActionListener<IndicesAliasesResponse>() {
@Override
public void onResponse(IndicesAliasesResponse response) {
if (response.isAcknowledged()) {
logger.info("Added modern aliases to 2.x monitoring indices");
} else {
logger.info("Unable to add modern aliases to 2.x monitoring indices, response not acknowledged.");
}
}
@Override
public void onFailure(Exception e) {
logger.error("Unable to add modern aliases to 2.x monitoring indices", e);
}
});
installedSomething = true;
}
if (installedSomething) {
// let the cluster catch up (and because we do the PUTs asynchronously)
return null;
@ -362,6 +395,22 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
});
}
private IndicesAliasesRequest prepareAddAliasesTo2xIndices(ClusterState clusterState) {
IndicesAliasesRequest request = null;
for (IndexMetaData index : clusterState.metaData()) {
String name = index.getIndex().getName();
if (name.startsWith(".marvel-es-1-")) {
String alias = ".monitoring-es-2-" + name.substring(".marvel-es-1-".length());
if (index.getAliases().containsKey(alias)) continue;
if (request == null) {
request = new IndicesAliasesRequest();
}
request.addAliasAction(AliasActions.add().index(name).alias(alias));
}
}
return request;
}
enum State {
INITIALIZED,
RUNNING,

View File

@ -0,0 +1,190 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring;
import org.elasticsearch.AbstractOldXPackIndicesBackwardsCompatibilityTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.resolver.cluster.ClusterStateResolver;
import org.elasticsearch.xpack.monitoring.resolver.indices.IndexStatsResolver;
import org.elasticsearch.xpack.monitoring.resolver.indices.IndicesStatsResolver;
import org.elasticsearch.xpack.monitoring.resolver.node.NodeStatsResolver;
import org.elasticsearch.xpack.monitoring.resolver.shards.ShardsResolver;
import org.hamcrest.Matcher;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
/**
* Tests for monitoring indexes created before 5.0.
*/
public class OldMonitoringIndicesBackwardsCompatibilityIT extends AbstractOldXPackIndicesBackwardsCompatibilityTestCase {
private final boolean httpExporter = randomBoolean();
@Override
public Settings nodeSettings(int ord) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(ord))
.put(XPackSettings.MONITORING_ENABLED.getKey(), true)
// Don't clean old monitoring indexes - we want to make sure we can load them
.put(MonitoringSettings.HISTORY_DURATION.getKey(), TimeValue.timeValueHours(1000 * 365 * 24).getStringRep());
if (httpExporter) {
/* If we want to test the http exporter we have to create it but disable it. We need to create it so we don't use the default
* local exporter and we have to disable it because we don't yet know the port we've bound to. We can only get that once
* Elasticsearch starts so we'll enable the exporter then. */
settings.put(NetworkModule.HTTP_ENABLED.getKey(), true);
setupHttpExporter(settings, null);
}
return settings.build();
}
private void setupHttpExporter(Settings.Builder settings, Integer port) {
Map<String, String> httpExporter = new HashMap<>();
httpExporter.put("type", "http");
httpExporter.put("enabled", port == null ? "false" : "true");
httpExporter.put("host", "http://localhost:" + (port == null ? "does_not_matter" : port));
httpExporter.put("auth.username", SecuritySettingsSource.DEFAULT_USER_NAME);
httpExporter.put("auth.password", SecuritySettingsSource.DEFAULT_PASSWORD);
settings.putProperties(httpExporter, k -> true, k -> MonitoringSettings.EXPORTERS_SETTINGS.getKey() + "my_exporter." + k);
}
@Override
protected void checkVersion(Version version) throws Exception {
if (version.before(Version.V_2_3_0)) {
/* We can't do anything with indexes created before 2.3 so we just assert that we didn't delete them or do anything otherwise
* crazy. */
SearchResponse response = client().prepareSearch(".marvel-es-data").get();
// 2.0.x didn't index the nodes info
long expectedEsData = version.before(Version.V_2_1_0) ? 1 : 2;
assertHitCount(response, expectedEsData);
response = client().prepareSearch(".marvel-es-*").get();
assertThat(response.getHits().totalHits(), greaterThanOrEqualTo(20L));
return;
}
/* Indexes created from 2.3 onwards get aliased to the place they'd be if they were created in 5.0 so queries should just work.
* Monitoring doesn't really have a Java API so we can't test that, but we can test that we write the data we expected to write. */
if (httpExporter) {
// If we're using the http exporter we need feed it the port and enable it
NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().get();
TransportAddress publishAddress = nodeInfos.getNodes().get(0).getHttp().address().publishAddress();
assertEquals(1, publishAddress.uniqueAddressTypeId());
InetSocketAddress address = ((InetSocketTransportAddress) publishAddress).address();
Settings.Builder settings = Settings.builder();
setupHttpExporter(settings, address.getPort());
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get();
}
// Wait for the exporter to come online and add the aliases
long end = TimeUnit.SECONDS.toNanos(30) + System.nanoTime();
SearchResponse firstIndexStats;
while (true) {
try {
firstIndexStats = search(new IndexStatsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(10L));
break;
} catch (IndexNotFoundException e) {
if (System.nanoTime() - end > 0) {
throw e;
}
}
}
// All the other aliases should have been created by now so we can assert that we have the data we saved in the bwc indexes
SearchResponse firstShards = search(new ShardsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(10L));
SearchResponse firstIndicesStats = search(new IndicesStatsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(3L));
SearchResponse firstNodeStats = search(new NodeStatsResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(3L));
SearchResponse firstClusterState = search(new ClusterStateResolver(MonitoredSystem.ES, Settings.EMPTY), greaterThanOrEqualTo(3L));
// Verify some stuff about the stuff in the backwards compatibility indexes
Arrays.stream(firstIndexStats.getHits().hits()).forEach(hit -> checkIndexStats(hit.sourceAsMap()));
Arrays.stream(firstShards.getHits().hits()).forEach(hit -> checkShards(hit.sourceAsMap()));
Arrays.stream(firstIndicesStats.getHits().hits()).forEach(hit -> checkIndicesStats(hit.sourceAsMap()));
Arrays.stream(firstNodeStats.getHits().hits()).forEach(hit -> checkNodeStats(hit.sourceAsMap()));
Arrays.stream(firstClusterState.getHits().hits()).forEach(hit -> checkClusterState(hit.sourceAsMap()));
// Wait for monitoring to accumulate some data about the current cluster
long indexStatsCount = firstIndexStats.getHits().totalHits();
assertBusy(() -> search(new IndexStatsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(indexStatsCount)), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new ShardsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstShards.getHits().totalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new IndicesStatsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstIndicesStats.getHits().totalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new NodeStatsResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstNodeStats.getHits().totalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(new ClusterStateResolver(MonitoredSystem.ES, Settings.EMPTY),
greaterThan(firstClusterState.getHits().totalHits())), 1, TimeUnit.MINUTES);
}
private SearchResponse search(MonitoringIndexNameResolver<?> resolver, Matcher<Long> hitCount) {
SearchResponse response = client().prepareSearch(resolver.indexPattern()).setTypes(resolver.type(null)).get();
assertThat(response.getHits().totalHits(), hitCount);
return response;
}
private void checkIndexStats(Map<String, Object> indexStats) {
checkMonitoringElement(indexStats);
Map<?, ?> stats = (Map<?, ?>) indexStats.get("index_stats");
assertThat(stats, hasKey("index"));
Map<?, ?> total = (Map<?, ?>) stats.get("total");
Map<?, ?> docs = (Map<?, ?>) total.get("docs");
// These might have been taken before all the documents were added so we can't assert a whole lot about the number
assertThat((Integer) docs.get("count"), greaterThanOrEqualTo(0));
}
private void checkShards(Map<String, Object> shards) {
checkMonitoringElement(shards);
Map<?, ?> shard = (Map<?, ?>) shards.get("shard");
assertThat(shard, allOf(hasKey("index"), hasKey("state"), hasKey("primary"), hasKey("node")));
}
private void checkIndicesStats(Map<String, Object> indicesStats) {
checkMonitoringElement(indicesStats);
Map<?, ?> stats = (Map<?, ?>) indicesStats.get("indices_stats");
Map<?, ?> all = (Map<?, ?>) stats.get("_all");
Map<?, ?> primaries = (Map<?, ?>) all.get("primaries");
Map<?, ?> docs = (Map<?, ?>) primaries.get("docs");
// These might have been taken before all the documents were added so we can't assert a whole lot about the number
assertThat((Integer) docs.get("count"), greaterThanOrEqualTo(0));
}
@SuppressWarnings("unchecked")
private void checkNodeStats(Map<String, Object> nodeStats) {
checkMonitoringElement(nodeStats);
Map<?, ?> stats = (Map<?, ?>) nodeStats.get("node_stats");
assertThat(stats, allOf(hasKey("node_id"), hasKey("node_master"), hasKey("mlockall"), hasKey("disk_threshold_enabled"),
hasKey("indices"), hasKey("process"), hasKey("jvm"), hasKey("thread_pool")));
}
private void checkClusterState(Map<String, Object> clusterState) {
checkMonitoringElement(clusterState);
Map<?, ?> stats = (Map<?, ?>) clusterState.get("cluster_state");
assertThat(stats, allOf(hasKey("status"), hasKey("version"), hasKey("state_uuid"), hasKey("master_node"), hasKey("nodes")));
}
private void checkMonitoringElement(Map<String, Object> element) {
assertThat(element, allOf(hasKey("cluster_uuid"), hasKey("timestamp")));
}
}

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import okio.Buffer;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import okio.Buffer;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
@ -24,8 +25,10 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
@ -40,7 +43,6 @@ import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
import org.elasticsearch.xpack.monitoring.resolver.bulk.MonitoringBulkTimestampedResolver;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.joda.time.format.DateTimeFormat;
import org.junit.After;
import org.junit.Before;
@ -60,10 +62,12 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.FILTER_PATH_NONE;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class HttpExporterIT extends MonitoringIntegTestCase {
@ -90,13 +94,14 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
public void testExport() throws Exception {
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueTemplateAndPipelineResponses(webServer, templatesExistsAlready, pipelineExistsAlready);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
final Settings.Builder builder = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
final Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", webServerContainer.getFormattedAddress());
@ -105,13 +110,15 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready);
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
assertBulk(webServer, nbDocs);
}
public void testExportWithHeaders() throws Exception {
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
final String headerValue = randomAsciiOfLengthBetween(3, 9);
final String[] array = generateRandomStringArray(2, 4, false);
@ -123,11 +130,10 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
headers.put("Array-Check", array);
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueTemplateAndPipelineResponses(webServer, templatesExistsAlready, pipelineExistsAlready);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
Settings.Builder builder = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", webServerContainer.getFormattedAddress())
.put("xpack.monitoring.exporters._http.headers.X-Cloud-Cluster", headerValue)
@ -139,7 +145,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, headers, null);
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist, headers, null);
assertBulk(webServer, nbDocs, headers, null);
}
@ -147,6 +153,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
final boolean useHeaders = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
final String headerValue = randomAsciiOfLengthBetween(3, 9);
final String[] array = generateRandomStringArray(2, 4, false);
@ -160,7 +168,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueTemplateAndPipelineResponses(webServer, templatesExistsAlready, pipelineExistsAlready);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false}");
String basePath = "path/to";
@ -177,17 +185,15 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
basePath = "/" + basePath;
}
final Settings.Builder builder = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
final Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", webServerContainer.getFormattedAddress())
.put("xpack.monitoring.exporters._http.proxy.base_path", basePath + (randomBoolean() ? "/" : ""));
if (useHeaders) {
builder
.put("xpack.monitoring.exporters._http.headers.X-Cloud-Cluster", headerValue)
.put("xpack.monitoring.exporters._http.headers.X-Found-Cluster", headerValue)
.putArray("xpack.monitoring.exporters._http.headers.Array-Check", array);
builder.put("xpack.monitoring.exporters._http.headers.X-Cloud-Cluster", headerValue)
.put("xpack.monitoring.exporters._http.headers.X-Found-Cluster", headerValue)
.putArray("xpack.monitoring.exporters._http.headers.Array-Check", array);
}
internalCluster().startNode(builder);
@ -195,28 +201,30 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, headers, basePath);
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist, headers,
basePath);
assertBulk(webServer, nbDocs, headers, basePath);
}
public void testHostChangeReChecksTemplate() throws Exception {
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
Settings.Builder builder = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", webServerContainer.getFormattedAddress());
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueTemplateAndPipelineResponses(webServer, templatesExistsAlready, pipelineExistsAlready);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false}");
internalCluster().startNode(builder);
export(Collections.singletonList(newRandomMonitoringDoc()));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready);
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
assertBulk(webServer);
try (final MockWebServerContainer secondWebServerContainer = new MockWebServerContainer(webServerContainer.getPort() + 1)) {
@ -237,6 +245,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
// opposite of if it existed before
enqueuePipelineResponses(secondWebServer, !pipelineExistsAlready);
enqueueBackwardsCompatibilityAliasResponse(secondWebServer, bwcIndexesExist, true);
enqueueResponse(secondWebServer, 200, "{\"errors\": false}");
logger.info("--> exporting a second event");
@ -252,24 +261,24 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
if (template.v1().contains(MonitoringBulkTimestampedResolver.Data.DATA) == false) {
recordedRequest = secondWebServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.v1() + resourceQueryString()));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.v1() + resourceQueryString()));
assertThat(recordedRequest.getBody().readUtf8(), equalTo(template.v2()));
}
}
assertMonitorPipelines(secondWebServer, !pipelineExistsAlready, null, null);
assertMonitorBackwardsCompatibilityAliases(secondWebServer, false, null, null);
assertBulk(secondWebServer);
}
}
public void testUnsupportedClusterVersion() throws Exception {
Settings.Builder builder = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", webServerContainer.getFormattedAddress());
// returning an unsupported cluster version
enqueueGetClusterVersionResponse(randomFrom(Version.fromString("0.18.0"), Version.fromString("1.0.0"),
Version.fromString("1.4.0"), Version.fromString("2.4.0")));
enqueueGetClusterVersionResponse(randomFrom(Version.fromString("0.18.0"), Version.fromString("1.0.0"), Version.fromString("1.4.0"),
Version.fromString("2.4.0")));
String agentNode = internalCluster().startNode(builder);
@ -284,22 +293,23 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
public void testDynamicIndexFormatChange() throws Exception {
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
Settings.Builder builder = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", webServerContainer.getFormattedAddress());
internalCluster().startNode(builder);
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueTemplateAndPipelineResponses(webServer, templatesExistsAlready, pipelineExistsAlready);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
MonitoringDoc doc = newRandomMonitoringDoc();
export(Collections.singletonList(doc));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready);
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
RecordedRequest recordedRequest = assertBulk(webServer);
@SuppressWarnings("unchecked")
@ -312,11 +322,11 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
assertThat(index.get("_index"), equalTo(indexName));
String newTimeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put("xpack.monitoring.exporters._http.index.name.time_format", newTimeFormat)));
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("xpack.monitoring.exporters._http.index.name.time_format", newTimeFormat)));
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueTemplateAndPipelineResponses(webServer, true, true);
enqueueSetupResponses(webServer, true, true, false, false);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
doc = newRandomMonitoringDoc();
@ -325,7 +335,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
String expectedMonitoringIndex = ".monitoring-es-" + MonitoringTemplateUtils.TEMPLATE_VERSION + "-"
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.getTimestamp());
assertMonitorResources(webServer, true, true);
assertMonitorResources(webServer, true, true, false, false);
recordedRequest = assertBulk(webServer);
bytes = recordedRequest.getBody().readByteArray();
@ -339,9 +349,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
assertMonitorVersion(webServer, null, null);
}
private void assertMonitorVersion(final MockWebServer webServer,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath)
throws Exception {
private void assertMonitorVersion(final MockWebServer webServer, @Nullable final Map<String, String[]> customHeaders,
@Nullable final String basePath) throws Exception {
final String pathPrefix = basePathToAssertablePrefix(basePath);
final RecordedRequest request = webServer.takeRequest();
@ -350,24 +359,22 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
assertHeaders(request, customHeaders);
}
private void assertMonitorResources(final MockWebServer webServer,
final boolean templateAlreadyExists, final boolean pipelineAlreadyExists)
throws Exception {
assertMonitorResources(webServer, templateAlreadyExists, pipelineAlreadyExists, null, null);
private void assertMonitorResources(final MockWebServer webServer, final boolean templateAlreadyExists,
final boolean pipelineAlreadyExists, boolean bwcIndexesExist, boolean bwcAliasesExist) throws Exception {
assertMonitorResources(webServer, templateAlreadyExists, pipelineAlreadyExists, bwcIndexesExist, bwcAliasesExist, null, null);
}
private void assertMonitorResources(final MockWebServer webServer,
final boolean templateAlreadyExists, final boolean pipelineAlreadyExists,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath)
throws Exception {
private void assertMonitorResources(final MockWebServer webServer, final boolean templateAlreadyExists,
final boolean pipelineAlreadyExists, boolean bwcIndexesExist, boolean bwcAliasesExist,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath) throws Exception {
assertMonitorVersion(webServer, customHeaders, basePath);
assertMonitorTemplates(webServer, templateAlreadyExists, customHeaders, basePath);
assertMonitorPipelines(webServer, pipelineAlreadyExists, customHeaders, basePath);
assertMonitorBackwardsCompatibilityAliases(webServer, bwcIndexesExist && false == bwcAliasesExist, customHeaders, basePath);
}
private void assertMonitorTemplates(final MockWebServer webServer, final boolean alreadyExists,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath)
throws Exception {
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath) throws Exception {
final String pathPrefix = basePathToAssertablePrefix(basePath);
RecordedRequest request;
@ -390,8 +397,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
private void assertMonitorPipelines(final MockWebServer webServer, final boolean alreadyExists,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath)
throws Exception {
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath) throws Exception {
final String pathPrefix = basePathToAssertablePrefix(basePath);
RecordedRequest request = webServer.takeRequest();
@ -404,12 +410,35 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
assertThat(request.getMethod(), equalTo("PUT"));
assertThat(request.getPath(),
equalTo(pathPrefix + "/_ingest/pipeline/" + Exporter.EXPORT_PIPELINE_NAME + resourceQueryString()));
equalTo(pathPrefix + "/_ingest/pipeline/" + Exporter.EXPORT_PIPELINE_NAME + resourceQueryString()));
assertThat(request.getBody().readUtf8(), equalTo(Exporter.emptyPipeline(XContentType.JSON).string()));
assertHeaders(request, customHeaders);
}
}
private void assertMonitorBackwardsCompatibilityAliases(final MockWebServer webServer, final boolean expectPost,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath) throws Exception {
final String pathPrefix = basePathToAssertablePrefix(basePath);
RecordedRequest request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("GET"));
assertThat(request.getPath(), startsWith(pathPrefix + "/.marvel-es-1-*"));
assertThat(request.getPath(), containsString("filter_path=*.aliases"));
assertThat(request.getPath(), containsString("master_timeout=30s"));
assertHeaders(request, customHeaders);
if (expectPost) {
request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("POST"));
assertThat(request.getPath(), startsWith(pathPrefix + "/_aliases"));
assertThat(request.getPath(), containsString("master_timeout=30s"));
assertThat(request.getBody().readUtf8(), containsString("add"));
assertHeaders(request, customHeaders);
}
}
private RecordedRequest assertBulk(final MockWebServer webServer) throws Exception {
return assertBulk(webServer, -1);
}
@ -418,10 +447,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
return assertBulk(webServer, docs, null, null);
}
private RecordedRequest assertBulk(final MockWebServer webServer, final int docs,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath)
throws Exception {
private RecordedRequest assertBulk(final MockWebServer webServer, final int docs, @Nullable final Map<String, String[]> customHeaders,
@Nullable final String basePath) throws Exception {
final String pathPrefix = basePathToAssertablePrefix(basePath);
final RecordedRequest request = webServer.takeRequest();
@ -521,17 +548,15 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
private void enqueueGetClusterVersionResponse(MockWebServer mockWebServer, Version v) throws IOException {
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(
jsonBuilder()
.startObject().startObject("version").field("number", v.toString()).endObject().endObject().bytes()
.utf8ToString()));
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(jsonBuilder().startObject().startObject("version")
.field("number", v.toString()).endObject().endObject().bytes().utf8ToString()));
}
private void enqueueTemplateAndPipelineResponses(final MockWebServer webServer,
final boolean templatesAlreadyExists, final boolean pipelineAlreadyExists)
throws IOException {
private void enqueueSetupResponses(MockWebServer webServer, boolean templatesAlreadyExists, boolean pipelineAlreadyExists,
boolean bwcIndexesExist, boolean bwcAliasesExist) throws IOException {
enqueueTemplateResponses(webServer, templatesAlreadyExists);
enqueuePipelineResponses(webServer, pipelineAlreadyExists);
enqueueBackwardsCompatibilityAliasResponse(webServer, bwcIndexesExist, bwcAliasesExist);
}
private void enqueueTemplateResponses(final MockWebServer webServer, final boolean alreadyExists) throws IOException {
@ -572,6 +597,38 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
enqueueResponse(webServer, 200, "pipeline [" + Exporter.EXPORT_PIPELINE_NAME + "] exists");
}
private void enqueueBackwardsCompatibilityAliasResponse(MockWebServer webServer, boolean bwcIndexesExist, boolean bwcAliasesExist)
throws IOException {
if (false == bwcIndexesExist && randomBoolean()) {
enqueueResponse(webServer, 404, "index does not exist");
return;
}
XContentBuilder response = JsonXContent.contentBuilder().prettyPrint().startObject();
if (bwcIndexesExist) {
int timestampIndexes = between(1, 100);
for (int i = 0; i < timestampIndexes; i++) {
writeIndex(response, ".marvel-es-1-" + i, bwcAliasesExist ? ".monitoring-es-2-" + i : "ignored");
}
}
response.endObject();
enqueueResponse(webServer, 200, response.string());
if (bwcIndexesExist && false == bwcAliasesExist) {
enqueueResponse(webServer, 200, "{\"acknowledged\": true}");
}
}
private void writeIndex(XContentBuilder response, String index, String alias) throws IOException {
response.startObject(index);
{
response.startObject("aliases");
{
response.startObject(alias).endObject();
}
response.endObject();
}
response.endObject();
}
private void enqueueResponse(int responseCode, String body) throws IOException {
enqueueResponse(webServer, responseCode, body);
}

View File

@ -221,6 +221,7 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
whenSuccessfulPutTemplates(unsuccessfulGetTemplates);
whenGetPipelines(successfulGetPipelines, unsuccessfulGetPipelines);
whenSuccessfulPutPipelines(1);
whenSuccessfulBackwardsCompatibilityAliases();
assertTrue(resources.isDirty());
@ -233,6 +234,7 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
verifyPutTemplates(unsuccessfulGetTemplates);
verifyGetPipelines(1);
verifyPutPipelines(unsuccessfulGetPipelines);
verifyBackwardsCompatibilityAliases();
verifyNoMoreInteractions(client);
}
@ -353,6 +355,22 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
}
}
private void whenSuccessfulBackwardsCompatibilityAliases() throws IOException {
// Just return no indexes so we won't have to mock adding aliases
final Response response = mock(Response.class);
final StatusLine statusLine = mock(StatusLine.class);
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(RestStatus.OK.getStatus());
when(response.getEntity()).thenReturn(new StringEntity("{}"));
when(client.performRequest(eq("GET"),
startsWith("/.marvel-es-1-*"),
anyMapOf(String.class, String.class)))
.thenReturn(response);
}
private void verifyVersionCheck() throws IOException {
verify(client).performRequest(eq("GET"), eq("/"), anyMapOf(String.class, String.class));
}
@ -379,4 +397,7 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
any(HttpEntity.class)); // raw template
}
private void verifyBackwardsCompatibilityAliases() throws IOException {
verify(client).performRequest(eq("GET"), startsWith("/.marvel-es-1-*"), anyMapOf(String.class, String.class));
}
}

View File

@ -256,6 +256,7 @@ public class HttpExporterTests extends ESTestCase {
final boolean useIngest = randomBoolean();
final TimeValue templateTimeout = randomFrom(TimeValue.timeValueSeconds(30), null);
final TimeValue pipelineTimeout = randomFrom(TimeValue.timeValueSeconds(30), null);
final TimeValue aliasTimeout = randomFrom(TimeValue.timeValueSeconds(30), null);
final Settings.Builder builder = Settings.builder()
.put("xpack.monitoring.exporters._http.type", "http");
@ -265,12 +266,16 @@ public class HttpExporterTests extends ESTestCase {
}
if (templateTimeout != null) {
builder.put("xpack.monitoring.exporters._http.index.template.master_timeout", templateTimeout.toString());
builder.put("xpack.monitoring.exporters._http.index.template.master_timeout", templateTimeout.getStringRep());
}
// note: this shouldn't get used with useIngest == false, but it doesn't hurt to try to cause issues
if (pipelineTimeout != null) {
builder.put("xpack.monitoring.exporters._http.index.pipeline.master_timeout", pipelineTimeout.toString());
builder.put("xpack.monitoring.exporters._http.index.pipeline.master_timeout", pipelineTimeout.getStringRep());
}
if (aliasTimeout != null) {
builder.put("xpack.monitoring.exporters._http.index.aliases.master_timeout", aliasTimeout.getStringRep());
}
final Config config = createConfig(builder.build());
@ -287,16 +292,22 @@ public class HttpExporterTests extends ESTestCase {
resources.stream().filter((resource) -> resource instanceof PipelineHttpResource)
.map(PipelineHttpResource.class::cast)
.collect(Collectors.toList());
final List<BackwardsCompatibilityAliasesResource> bwc =
resources.stream().filter(resource -> resource instanceof BackwardsCompatibilityAliasesResource)
.map(BackwardsCompatibilityAliasesResource.class::cast)
.collect(Collectors.toList());
// expected number of resources
assertThat(multiResource.getResources().size(), equalTo(version + templates.size() + pipelines.size()));
assertThat(multiResource.getResources().size(), equalTo(version + templates.size() + pipelines.size() + bwc.size()));
assertThat(version, equalTo(1));
assertThat(templates, hasSize(3));
assertThat(pipelines, hasSize(useIngest ? 1 : 0));
assertThat(bwc, hasSize(1));
// timeouts
assertMasterTimeoutSet(templates, templateTimeout);
assertMasterTimeoutSet(pipelines, pipelineTimeout);
assertMasterTimeoutSet(bwc, aliasTimeout);
// logging owner names
final List<String> uniqueOwners =
@ -401,10 +412,15 @@ public class HttpExporterTests extends ESTestCase {
}
}
private void assertMasterTimeoutSet(final List<? extends PublishableHttpResource> resources, final TimeValue timeout) {
private void assertMasterTimeoutSet(final List<? extends HttpResource> resources, final TimeValue timeout) {
if (timeout != null) {
for (final PublishableHttpResource resource : resources) {
assertThat(resource.getParameters().get("master_timeout"), equalTo(timeout.toString()));
for (final HttpResource resource : resources) {
if (resource instanceof PublishableHttpResource) {
assertEquals(timeout.getStringRep(), ((PublishableHttpResource) resource).getParameters().get("master_timeout"));
} else if (resource instanceof BackwardsCompatibilityAliasesResource) {
assertEquals(timeout.getStringRep(),
((BackwardsCompatibilityAliasesResource) resource).parameters().get("master_timeout"));
}
}
}
}

View File

@ -113,15 +113,18 @@ public abstract class AbstractOldXPackIndicesBackwardsCompatibilityTestCase exte
for (String dataFile : dataFiles) {
Version version = Version.fromString(dataFile.replace("x-pack-", "").replace(".zip", ""));
if (false == shouldTestVersion(version)) continue;
long clusterStartTime = System.nanoTime();
setupCluster(dataFile);
ensureYellow();
long startTime = System.nanoTime();
long testStartTime = System.nanoTime();
try {
checkVersion(version);
} catch (Throwable t) {
throw new AssertionError("Failed while checking [" + version + "]", t);
}
logger.info("--> Done testing {}, took {} millis", version, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
logger.info("--> Done testing [{}]. Setting up cluster took [{}] millis and testing took [{}] millis", version,
TimeUnit.NANOSECONDS.toMillis(testStartTime - clusterStartTime),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - testStartTime));
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.AbstractOldXPackIndicesBackwardsCompatibilityTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchResponse;