[Monitoring] BWC Allow New Types to be added to .monitoring-data-2 index (elastic/elasticsearch#4504)

* [Monitoring] BWC Allow New Types to be added to .monitoring-data-2 index

This adds the new Logstash (and technically Kibana) types to the index mapping if they're not already there. Pre-existing indices will be blocked from creating new types in the index due to the index setting. The index setting cannot be flipped without opening/closing the index, so manually adding the new types is easier.

Original commit: elastic/x-pack-elasticsearch@e85e800335
This commit is contained in:
Chris Earle 2017-01-03 16:47:09 -05:00 committed by GitHub
parent cd2e608ecc
commit 0d62207f8f
19 changed files with 1046 additions and 510 deletions

View File

@ -44,10 +44,6 @@ public abstract class Exporter implements AutoCloseable {
return config;
}
public boolean masterOnly() {
return false;
}
/** Returns true if only one instance of this exporter should be allowed. */
public boolean isSingleton() {
return false;

View File

@ -31,19 +31,15 @@ import static java.util.Collections.emptyMap;
public class Exporters extends AbstractLifecycleComponent implements Iterable<Exporter> {
private final Map<String, Exporter.Factory> factories;
private final ClusterService clusterService;
private final AtomicReference<Map<String, Exporter>> exporters;
public Exporters(Settings settings, Map<String, Exporter.Factory> factories,
ClusterService clusterService) {
public Exporters(Settings settings, Map<String, Exporter.Factory> factories, ClusterService clusterService) {
super(settings);
this.factories = factories;
this.clusterService = clusterService;
this.exporters = new AtomicReference<>(emptyMap());
clusterService.getClusterSettings().addSettingsUpdateConsumer(MonitoringSettings.EXPORTERS_SETTINGS,
this::setExportersSetting);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MonitoringSettings.EXPORTERS_SETTINGS, this::setExportersSetting);
}
private void setExportersSetting(Settings exportersSetting) {
@ -92,15 +88,10 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
ExportBulk openBulk() {
List<ExportBulk> bulks = new ArrayList<>();
for (Exporter exporter : this) {
if (exporter.masterOnly() && clusterService.state().nodes().isLocalNodeElectedMaster() == false) {
// the exporter is supposed to only run on the master node, but we're not
// the master node... so skipping
continue;
}
try {
ExportBulk bulk = exporter.openBulk();
if (bulk == null) {
logger.info("skipping exporter [{}] as it isn't ready yet", exporter.name());
logger.info("skipping exporter [{}] as it is not ready yet", exporter.name());
} else {
bulks.add(bulk);
}
@ -168,15 +159,14 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
throw new ExportException("Export service is not started");
}
if (docs != null && docs.size() > 0) {
ExportBulk bulk = openBulk();
if (bulk == null) {
throw new ExportException("exporters are either not ready or faulty");
}
final ExportBulk bulk = openBulk();
try {
bulk.add(docs);
} finally {
bulk.close(lifecycleState() == Lifecycle.State.STARTED);
if (bulk != null) {
try {
bulk.add(docs);
} finally {
bulk.close(lifecycleState() == Lifecycle.State.STARTED);
}
}
}
}

View File

@ -17,6 +17,14 @@ public final class MonitoringTemplateUtils {
/** Current version of es and data templates **/
public static final String TEMPLATE_VERSION = "2";
/**
* The name of the non-timestamped data index.
*/
public static final String DATA_INDEX = ".monitoring-data-" + TEMPLATE_VERSION;
/**
* Data types that should be supported by the {@linkplain #DATA_INDEX data index} that were not by the initial release.
*/
public static final String[] NEW_DATA_TYPES = { "kibana" };
private MonitoringTemplateUtils() {
}

View File

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Collections;
import java.util.Objects;
import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX;
/**
* {@linkplain DataTypeMappingHttpResource}s allow the checking and adding of index mapping's for new types that did not exist in previous
* versions.
* <p>
* This allows the use of Monitoring's REST endpoint to publish Kibana data to the data index even if the "kibana" type did not
* exist in their existing index mapping (e.g., they started with an early alpha release). Additionally, this also enables future types to
* be added without issue.
* <p>
* The root need for this is because the index mapping started with an index setting: "index.mapper.dynamic" set to false. This prevents
* new types from being dynamically added, which is obviously needed as new components (e.g., Kibana and Logstash) are monitored.
* Unfortunately, this setting cannot be flipped without also closing and reopening the index, so the fix is to manually add any new types.
*/
public class DataTypeMappingHttpResource extends PublishableHttpResource {
private static final Logger logger = Loggers.getLogger(DataTypeMappingHttpResource.class);
/**
* The name of the type that is created in the mappings on the remote cluster.
*/
private final String typeName;
/**
* Create a new {@link DataTypeMappingHttpResource}.
*
* @param resourceOwnerName The user-recognizable name
* @param masterTimeout Master timeout to use with any request.
* @param typeName The name of the mapping type (e.g., "kibana").
*/
public DataTypeMappingHttpResource(final String resourceOwnerName, @Nullable final TimeValue masterTimeout,
final String typeName) {
// we need to inspect the mappings, so we don't use filter_path to get rid of them
super(resourceOwnerName, masterTimeout, Collections.emptyMap());
this.typeName = Objects.requireNonNull(typeName);
}
/**
* Determine if the current {@linkplain #typeName type} exists.
*/
@Override
protected CheckResponse doCheck(final RestClient client) {
final Tuple<CheckResponse, Response> resource =
checkForResource(client, logger,
"/" + DATA_INDEX + "/_mapping", typeName, "monitoring mapping type",
resourceOwnerName, "monitoring cluster");
// depending on the content, we need to flip the actual response
CheckResponse checkResponse = resource.v1();
if (checkResponse == CheckResponse.EXISTS && resource.v2().getEntity().getContentLength() <= 2) {
// it "exists" if the index exists at all; it doesn't guarantee that the mapping exists
// the content will be "{}" if no mapping exists
checkResponse = CheckResponse.DOES_NOT_EXIST;
} else if (checkResponse == CheckResponse.DOES_NOT_EXIST) {
// DNE indicates that the entire index is missing, which means the template will create it; we only add types!
checkResponse = CheckResponse.EXISTS;
}
return checkResponse;
}
/**
* Add the current {@linkplain #typeName type} to the index's mappings.
*/
@Override
protected boolean doPublish(final RestClient client) {
// this could be a class-level constant, but it does not need to live the entire duration of ES; only the few times it is used
final HttpEntity disabledEntity = new StringEntity("{\"enabled\":false}", ContentType.APPLICATION_JSON);
return putResource(client, logger,
"/" + DATA_INDEX + "/_mapping", typeName, () -> disabledEntity, "monitoring mapping type",
resourceOwnerName, "monitoring cluster");
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
import org.elasticsearch.xpack.ssl.SSLService;
@ -520,6 +521,13 @@ public class HttpExporter extends Exporter {
final TimeValue templateTimeout = config.settings().getAsTime(TEMPLATE_CHECK_TIMEOUT_SETTING, null);
final Set<String> templateNames = new HashSet<>();
// add a resource to check the index mappings of the .monitoring-data-# index
// We ensure (and add if it's not) that the kibana type is there for the index for those few customers that upgraded from alphas;
// this step makes it very easy to add logstash in 5.2+ (and eventually beats)
for (final String type : MonitoringTemplateUtils.NEW_DATA_TYPES) {
resources.add(new DataTypeMappingHttpResource(resourceOwnerName, templateTimeout, type));
}
for (final MonitoringIndexNameResolver resolver : resolvers) {
final String templateName = resolver.templateName();

View File

@ -57,9 +57,9 @@ public class PipelineHttpResource extends PublishableHttpResource {
*/
@Override
protected CheckResponse doCheck(final RestClient client) {
return checkForResource(client, logger,
"/_ingest/pipeline", pipelineName, "monitoring pipeline",
resourceOwnerName, "monitoring cluster");
return simpleCheckForResource(client, logger,
"/_ingest/pipeline", pipelineName, "monitoring pipeline",
resourceOwnerName, "monitoring cluster");
}
/**

View File

@ -13,6 +13,7 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
@ -144,7 +145,8 @@ public abstract class PublishableHttpResource extends HttpResource {
/**
* Determine if the current {@code resourceName} exists at the {@code resourceBasePath} endpoint.
* <p>
* This provides the base-level check for any resource that does not need to inspect its actual contents.
* This provides the base-level check for any resource that does not need to care about its response beyond existence (and likely does
* not need to inspect its contents).
*
* @param client The REST client to make the request(s).
* @param logger The logger to use for status messages.
@ -155,10 +157,33 @@ public abstract class PublishableHttpResource extends HttpResource {
* @param resourceOwnerType The type of resource owner being dealt with (e.g., "monitoring cluster").
* @return Never {@code null}.
*/
protected CheckResponse checkForResource(final RestClient client, final Logger logger,
final String resourceBasePath,
final String resourceName, final String resourceType,
final String resourceOwnerName, final String resourceOwnerType) {
protected CheckResponse simpleCheckForResource(final RestClient client, final Logger logger,
final String resourceBasePath,
final String resourceName, final String resourceType,
final String resourceOwnerName, final String resourceOwnerType) {
return checkForResource(client, logger, resourceBasePath, resourceName, resourceType, resourceOwnerName, resourceOwnerType).v1();
}
/**
* Determine if the current {@code resourceName} exists at the {@code resourceBasePath} endpoint.
* <p>
* This provides the base-level check for any resource that cares about existence and also its contents.
*
* @param client The REST client to make the request(s).
* @param logger The logger to use for status messages.
* @param resourceBasePath The base path/endpoint to check for the resource (e.g., "/_template").
* @param resourceName The name of the resource (e.g., "template123").
* @param resourceType The type of resource (e.g., "monitoring template").
* @param resourceOwnerName The user-recognizeable resource owner.
* @param resourceOwnerType The type of resource owner being dealt with (e.g., "monitoring cluster").
* @return Never {@code null} pair containing the checked response and the returned response.
* The response will only ever be {@code null} if none was returned.
* @see #simpleCheckForResource(RestClient, Logger, String, String, String, String, String)
*/
protected Tuple<CheckResponse, Response> checkForResource(final RestClient client, final Logger logger,
final String resourceBasePath,
final String resourceName, final String resourceType,
final String resourceOwnerName, final String resourceOwnerType) {
logger.trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType);
try {
@ -169,18 +194,19 @@ public abstract class PublishableHttpResource extends HttpResource {
if (response.getStatusLine().getStatusCode() == RestStatus.OK.getStatus()) {
logger.debug("{} [{}] found on the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType);
return CheckResponse.EXISTS;
return new Tuple<>(CheckResponse.EXISTS, response);
} else {
throw new ResponseException(response);
}
} catch (final ResponseException e) {
final int statusCode = e.getResponse().getStatusLine().getStatusCode();
final Response response = e.getResponse();
final int statusCode = response.getStatusLine().getStatusCode();
// 404
if (statusCode == RestStatus.NOT_FOUND.getStatus()) {
logger.debug("{} [{}] does not exist on the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType);
return CheckResponse.DOES_NOT_EXIST;
return new Tuple<>(CheckResponse.DOES_NOT_EXIST, response);
} else {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("failed to verify {} [{}] on the [{}] {} with status code [{}]",
@ -188,7 +214,7 @@ public abstract class PublishableHttpResource extends HttpResource {
e);
// weirder failure than below; block responses just like other unexpected failures
return CheckResponse.ERROR;
return new Tuple<>(CheckResponse.ERROR, response);
}
} catch (IOException | RuntimeException e) {
logger.error((Supplier<?>) () ->
@ -197,7 +223,7 @@ public abstract class PublishableHttpResource extends HttpResource {
e);
// do not attempt to publish the resource because we're in a broken state
return CheckResponse.ERROR;
return new Tuple<>(CheckResponse.ERROR, null);
}
}

View File

@ -58,9 +58,9 @@ public class TemplateHttpResource extends PublishableHttpResource {
*/
@Override
protected CheckResponse doCheck(final RestClient client) {
return checkForResource(client, logger,
"/_template", templateName, "monitoring template",
resourceOwnerName, "monitoring cluster");
return simpleCheckForResource(client, logger,
"/_template", templateName, "monitoring template",
resourceOwnerName, "monitoring cluster");
}
/**

View File

@ -17,6 +17,8 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasA
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
@ -41,6 +43,7 @@ import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.exporter.ExportBulk;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
import org.elasticsearch.xpack.security.InternalClient;
@ -137,126 +140,16 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
}
// List of distinct templates
Map<String, String> templates = StreamSupport.stream(new ResolversRegistry(Settings.EMPTY).spliterator(), false)
final Map<String, String> templates = StreamSupport.stream(new ResolversRegistry(Settings.EMPTY).spliterator(), false)
.collect(Collectors.toMap(MonitoringIndexNameResolver::templateName, MonitoringIndexNameResolver::template, (a, b) -> a));
// if this is not the master, we'll just look to see if the monitoring templates are installed.
// If they all are, we'll be able to start this exporter. Otherwise, we'll just wait for a new cluster state.
if (clusterService.state().nodes().isLocalNodeElectedMaster() == false) {
for (String template : templates.keySet()) {
if (hasTemplate(template, clusterState) == false) {
// the required template is not yet installed in the given cluster state, we'll wait.
logger.debug("monitoring index template [{}] does not exist, so service cannot start", template);
return null;
}
}
// if we don't have the ingest pipeline, then it's going to fail anyway
if (hasIngestPipelines(clusterState) == false) {
logger.debug("monitoring ingest pipeline [{}] does not exist, so service cannot start", EXPORT_PIPELINE_NAME);
// if this is not the master, we just need to make sure the master has set things up
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
if (setupIfElectedMaster(clusterState, templates) == false) {
return null;
}
if (null != prepareAddAliasesTo2xIndices(clusterState)) {
logger.debug("old monitoring indexes exist without aliases, waiting for them to get new aliases");
return null;
}
logger.trace("monitoring index templates and pipelines are installed, service can start");
} else {
// we are on the elected master
// Check that there is nothing that could block metadata updates
if (clusterState.blocks().hasGlobalBlock(ClusterBlockLevel.METADATA_WRITE)) {
logger.debug("waiting until metadata writes are unblocked");
return null;
}
if (installingSomething.get() == true) {
logger.trace("already installing something, waiting for install to complete");
return null;
}
// build a list of runnables for everything that is missing, but do not start execution
final List<Runnable> asyncActions = new ArrayList<>();
final AtomicInteger pendingResponses = new AtomicInteger(0);
// Check that each required template exist, installing it if needed
final List<Entry<String, String>> missingTemplates = templates.entrySet()
.stream()
.filter((e) -> hasTemplate(e.getKey(), clusterState) == false)
.collect(Collectors.toList());
if (missingTemplates.isEmpty() == false) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("template {} not found",
missingTemplates.stream().map(Map.Entry::getKey).collect(Collectors.toList())));
for (Entry<String, String> template : missingTemplates) {
asyncActions.add(() -> putTemplate(template.getKey(), template.getValue(),
new ResponseActionListener<>("template", template.getKey(), pendingResponses)));
}
}
// if we don't have the ingest pipeline, then install it
if (hasIngestPipelines(clusterState) == false) {
logger.debug("pipeline [{}] not found", EXPORT_PIPELINE_NAME);
asyncActions.add(() -> putIngestPipeline(new ResponseActionListener<>("pipeline", EXPORT_PIPELINE_NAME, pendingResponses)));
} else {
logger.trace("pipeline [{}] found", EXPORT_PIPELINE_NAME);
}
IndicesAliasesRequest addAliasesTo2xIndices = prepareAddAliasesTo2xIndices(clusterState);
if (addAliasesTo2xIndices == null) {
logger.trace("there are no 2.x monitoring indices or they have all the aliases they need");
} else {
final List<String> monitoringIndices2x = addAliasesTo2xIndices.getAliasActions().stream()
.flatMap((a) -> Arrays.stream(a.indices()))
.collect(Collectors.toList());
logger.debug("there are 2.x monitoring indices {} and they are missing some aliases to make them compatible with 5.x",
monitoringIndices2x);
asyncActions.add(() -> client.execute(IndicesAliasesAction.INSTANCE, addAliasesTo2xIndices,
new ActionListener<IndicesAliasesResponse>() {
@Override
public void onResponse(IndicesAliasesResponse response) {
responseReceived();
if (response.isAcknowledged()) {
logger.info("Added modern aliases to 2.x monitoring indices {}", monitoringIndices2x);
} else {
logger.info("Unable to add modern aliases to 2.x monitoring indices {}, response not acknowledged.",
monitoringIndices2x);
}
}
@Override
public void onFailure(Exception e) {
responseReceived();
logger.error((Supplier<?>)
() -> new ParameterizedMessage("Unable to add modern aliases to 2.x monitoring indices {}",
monitoringIndices2x), e);
}
private void responseReceived() {
if (pendingResponses.decrementAndGet() <= 0) {
logger.trace("all installation requests returned a response");
if (installingSomething.compareAndSet(true, false) == false) {
throw new IllegalStateException("could not reset installing flag to false");
}
}
}
}));
}
if (asyncActions.size() > 0) {
if (installingSomething.compareAndSet(false, true)) {
pendingResponses.set(asyncActions.size());
asyncActions.forEach(Runnable::run);
} else {
// let the cluster catch up since requested installations may be ongoing
return null;
}
} else {
logger.debug("monitoring index templates and pipelines are installed on master node, service can start");
}
} else if (setupIfNotElectedMaster(clusterState, templates.keySet()) == false) {
return null;
}
if (state.compareAndSet(State.INITIALIZED, State.RUNNING)) {
@ -266,6 +159,200 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
return new LocalBulk(name(), logger, client, resolvers, config.settings().getAsBoolean(USE_INGEST_PIPELINE_SETTING, true));
}
/**
* When not on the elected master, we require all resources (mapping types, templates, and pipelines) to be available before we
* attempt to run the exporter. If those resources do not exist, then it means the elected master's exporter has not yet run, so the
* monitoring cluster (this one, as the local exporter) is not setup yet.
*
* @param clusterState The current cluster state.
* @param templates All template names that should exist.
* @return {@code true} indicates that all resources are available and the exporter can be used. {@code false} to stop and wait.
*/
private boolean setupIfNotElectedMaster(final ClusterState clusterState, final Set<String> templates) {
for (final String type : MonitoringTemplateUtils.NEW_DATA_TYPES) {
if (hasMappingType(type, clusterState) == false) {
// the required type is not yet there in the given cluster state, we'll wait.
logger.debug("monitoring index mapping [{}] does not exist in [{}], so service cannot start",
type, MonitoringTemplateUtils.DATA_INDEX);
return false;
}
}
for (final String template : templates) {
if (hasTemplate(template, clusterState) == false) {
// the required template is not yet installed in the given cluster state, we'll wait.
logger.debug("monitoring index template [{}] does not exist, so service cannot start", template);
return false;
}
}
// if we don't have the ingest pipeline, then it's going to fail anyway
if (hasIngestPipelines(clusterState) == false) {
logger.debug("monitoring ingest pipeline [{}] does not exist, so service cannot start", EXPORT_PIPELINE_NAME);
return false;
}
if (null != prepareAddAliasesTo2xIndices(clusterState)) {
logger.debug("old monitoring indexes exist without aliases, waiting for them to get new aliases");
return false;
}
logger.trace("monitoring index templates and pipelines are installed, service can start");
// everything is setup
return true;
}
/**
* When on the elected master, we setup all resources (mapping types, templates, and pipelines) before we attempt to run the exporter.
* If those resources do not exist, then we will create them.
*
* @param clusterState The current cluster state.
* @param templates All template names that should exist.
* @return {@code true} indicates that all resources are "ready" and the exporter can be used. {@code false} to stop and wait.
*/
private boolean setupIfElectedMaster(final ClusterState clusterState, final Map<String, String> templates) {
// we are on the elected master
// Check that there is nothing that could block metadata updates
if (clusterState.blocks().hasGlobalBlock(ClusterBlockLevel.METADATA_WRITE)) {
logger.debug("waiting until metadata writes are unblocked");
return false;
}
if (installingSomething.get() == true) {
logger.trace("already installing something, waiting for install to complete");
return false;
}
// build a list of runnables for everything that is missing, but do not start execution
final List<Runnable> asyncActions = new ArrayList<>();
final AtomicInteger pendingResponses = new AtomicInteger(0);
// Check that all necessary types exist for _xpack/monitoring/_bulk usage
final List<String> missingMappingTypes = Arrays.stream(MonitoringTemplateUtils.NEW_DATA_TYPES)
.filter((type) -> hasMappingType(type, clusterState) == false)
.collect(Collectors.toList());
// Check that each required template exist, installing it if needed
final List<Entry<String, String>> missingTemplates = templates.entrySet()
.stream()
.filter((e) -> hasTemplate(e.getKey(), clusterState) == false)
.collect(Collectors.toList());
if (missingMappingTypes.isEmpty() == false) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("type {} not found",
missingMappingTypes.stream().collect(Collectors.toList())));
for (final String type : missingMappingTypes) {
asyncActions.add(() -> putMappingType(type, new ResponseActionListener<>("type", type, pendingResponses)));
}
}
if (missingTemplates.isEmpty() == false) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("template {} not found",
missingTemplates.stream().map(Map.Entry::getKey).collect(Collectors.toList())));
for (Entry<String, String> template : missingTemplates) {
asyncActions.add(() -> putTemplate(template.getKey(), template.getValue(),
new ResponseActionListener<>("template", template.getKey(), pendingResponses)));
}
}
// if we don't have the ingest pipeline, then install it
if (hasIngestPipelines(clusterState) == false) {
logger.debug("pipeline [{}] not found", EXPORT_PIPELINE_NAME);
asyncActions.add(() -> putIngestPipeline(new ResponseActionListener<>("pipeline", EXPORT_PIPELINE_NAME, pendingResponses)));
} else {
logger.trace("pipeline [{}] found", EXPORT_PIPELINE_NAME);
}
IndicesAliasesRequest addAliasesTo2xIndices = prepareAddAliasesTo2xIndices(clusterState);
if (addAliasesTo2xIndices == null) {
logger.trace("there are no 2.x monitoring indices or they have all the aliases they need");
} else {
final List<String> monitoringIndices2x = addAliasesTo2xIndices.getAliasActions().stream()
.flatMap((a) -> Arrays.stream(a.indices()))
.collect(Collectors.toList());
logger.debug("there are 2.x monitoring indices {} and they are missing some aliases to make them compatible with 5.x",
monitoringIndices2x);
asyncActions.add(() -> client.execute(IndicesAliasesAction.INSTANCE, addAliasesTo2xIndices,
new ActionListener<IndicesAliasesResponse>() {
@Override
public void onResponse(IndicesAliasesResponse response) {
responseReceived();
if (response.isAcknowledged()) {
logger.info("Added modern aliases to 2.x monitoring indices {}", monitoringIndices2x);
} else {
logger.info("Unable to add modern aliases to 2.x monitoring indices {}, response not acknowledged.",
monitoringIndices2x);
}
}
@Override
public void onFailure(Exception e) {
responseReceived();
logger.error((Supplier<?>)
() -> new ParameterizedMessage("Unable to add modern aliases to 2.x monitoring indices {}",
monitoringIndices2x), e);
}
private void responseReceived() {
if (pendingResponses.decrementAndGet() <= 0) {
logger.trace("all installation requests returned a response");
if (installingSomething.compareAndSet(true, false) == false) {
throw new IllegalStateException("could not reset installing flag to false");
}
}
}
}));
}
if (asyncActions.size() > 0) {
if (installingSomething.compareAndSet(false, true)) {
pendingResponses.set(asyncActions.size());
asyncActions.forEach(Runnable::run);
} else {
// let the cluster catch up since requested installations may be ongoing
return false;
}
} else {
logger.debug("monitoring index templates and pipelines are installed on master node, service can start");
}
// everything is setup (or running)
return true;
}
/**
* Determine if the mapping {@code type} exists in the {@linkplain MonitoringTemplateUtils#DATA_INDEX data index}.
*
* @param type The data type to check (e.g., "kibana")
* @param clusterState The current cluster state
* @return {@code false} if the type mapping needs to be added.
*/
private boolean hasMappingType(final String type, final ClusterState clusterState) {
final IndexMetaData dataIndex = clusterState.getMetaData().getIndices().get(MonitoringTemplateUtils.DATA_INDEX);
// if the index does not exist, then the template will add it and the type; if the index does exist, then we need the type
return dataIndex == null || dataIndex.getMappings().containsKey(type);
}
/**
* Add the mapping {@code type} to the {@linkplain MonitoringTemplateUtils#DATA_INDEX data index}.
*
* @param type The data type to check (e.g., "kibana")
* @param listener The listener to use for handling the response
*/
private void putMappingType(final String type, final ActionListener<PutMappingResponse> listener) {
logger.debug("adding mapping type [{}] to [{}]", type, MonitoringTemplateUtils.DATA_INDEX);
final PutMappingRequest putMapping = new PutMappingRequest(MonitoringTemplateUtils.DATA_INDEX);
putMapping.type(type);
// avoid mapping at all; we use this index as a data cache rather than for search
putMapping.source("{\"enabled\":false}");
client.admin().indices().putMapping(putMapping, listener);
}
/**
* Determine if the ingest pipeline for {@link #EXPORT_PIPELINE_NAME} exists in the cluster or not.
*

View File

@ -3,10 +3,12 @@
"settings": {
"index.number_of_shards": 1,
"index.number_of_replicas": 1,
"index.codec": "best_compression",
"index.mapper.dynamic": false
"index.codec": "best_compression"
},
"mappings": {
"_default_": {
"enabled": false
},
"cluster_info": {
"enabled": false,
"_meta": {

View File

@ -1,216 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.hamcrest.Matchers.notNullValue;
@ClusterScope(scope = TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public abstract class AbstractExporterTemplateTestCase extends MonitoringIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MonitoringSettings.INTERVAL.getKey(), "-1");
for (Map.Entry<String, String> setting : exporterSettings().getAsMap().entrySet()) {
settings.put("xpack.monitoring.exporters._exporter." + setting.getKey(), setting.getValue());
}
return settings.build();
}
protected abstract Settings exporterSettings();
protected abstract void deleteTemplates() throws Exception;
protected abstract void deletePipeline() throws Exception;
protected abstract void putTemplate(String name) throws Exception;
protected abstract void putPipeline(String name) throws Exception;
protected abstract void assertTemplateExists(String name) throws Exception;
protected abstract void assertPipelineExists(String name) throws Exception;
protected abstract void assertTemplateNotUpdated(String name) throws Exception;
protected abstract void assertPipelineNotUpdated(String name) throws Exception;
public void testCreateWhenNoExistingTemplates() throws Exception {
internalCluster().startNode();
deleteTemplates();
deletePipeline();
doExporting();
logger.debug("--> templates does not exist: it should have been created in the current version");
for (String template : monitoringTemplateNames()) {
assertTemplateExists(template);
}
assertPipelineExists(Exporter.EXPORT_PIPELINE_NAME);
doExporting();
logger.debug("--> indices should have been created");
awaitIndexExists(currentDataIndexName());
awaitIndexExists(currentTimestampedIndexName());
}
public void testCreateWhenExistingTemplatesAreOld() throws Exception {
internalCluster().startNode();
putTemplate(indexTemplateName());
putTemplate(dataTemplateName());
putPipeline(Exporter.EXPORT_PIPELINE_NAME);
doExporting();
logger.debug("--> existing templates are old");
assertTemplateExists(dataTemplateName());
assertTemplateExists(indexTemplateName());
logger.debug("--> existing templates are old: new templates should be created");
for (String template : monitoringTemplateNames()) {
assertTemplateExists(template);
}
assertPipelineExists(Exporter.EXPORT_PIPELINE_NAME);
doExporting();
logger.debug("--> indices should have been created");
awaitIndexExists(currentDataIndexName());
awaitIndexExists(currentTimestampedIndexName());
}
public void testCreateWhenExistingTemplateAreUpToDate() throws Exception {
internalCluster().startNode();
putTemplate(indexTemplateName());
putTemplate(dataTemplateName());
putPipeline(Exporter.EXPORT_PIPELINE_NAME);
doExporting();
logger.debug("--> existing templates are up to date");
for (String template : monitoringTemplateNames()) {
assertTemplateExists(template);
}
assertPipelineExists(Exporter.EXPORT_PIPELINE_NAME);
logger.debug("--> existing templates has the same version: they should not be changed");
assertTemplateNotUpdated(indexTemplateName());
assertTemplateNotUpdated(dataTemplateName());
assertPipelineNotUpdated(Exporter.EXPORT_PIPELINE_NAME);
doExporting();
logger.debug("--> indices should have been created");
awaitIndexExists(currentDataIndexName());
awaitIndexExists(currentTimestampedIndexName());
}
protected void doExporting() throws Exception {
// TODO: these should be unit tests, not using guice
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
XPackLicenseState licenseState = internalCluster().getInstance(XPackLicenseState.class);
LicenseService licenseService = internalCluster().getInstance(LicenseService.class);
InternalClient client = internalCluster().getInstance(InternalClient.class);
Collector collector = new ClusterStatsCollector(clusterService.getSettings(), clusterService,
new MonitoringSettings(clusterService.getSettings(), clusterService.getClusterSettings()),
licenseState, client, licenseService);
Exporters exporters = internalCluster().getInstance(Exporters.class);
assertNotNull(exporters);
// Wait for exporting bulks to be ready to export
Runnable busy = () -> assertThat(exporters.openBulk(), notNullValue());
assertBusy(busy);
exporters.export(collector.collect());
}
private String dataTemplateName() {
MockDataIndexNameResolver resolver = new MockDataIndexNameResolver(MonitoringTemplateUtils.TEMPLATE_VERSION);
return resolver.templateName();
}
private String indexTemplateName() {
MockTimestampedIndexNameResolver resolver =
new MockTimestampedIndexNameResolver(MonitoredSystem.ES, exporterSettings(), MonitoringTemplateUtils.TEMPLATE_VERSION);
return resolver.templateName();
}
private String currentDataIndexName() {
MockDataIndexNameResolver resolver = new MockDataIndexNameResolver(MonitoringTemplateUtils.TEMPLATE_VERSION);
return resolver.index(null);
}
private String currentTimestampedIndexName() {
MonitoringDoc doc = new MonitoringDoc(MonitoredSystem.ES.getSystem(), Version.CURRENT.toString());
doc.setTimestamp(System.currentTimeMillis());
MockTimestampedIndexNameResolver resolver =
new MockTimestampedIndexNameResolver(MonitoredSystem.ES, exporterSettings(), MonitoringTemplateUtils.TEMPLATE_VERSION);
return resolver.index(doc);
}
/** Generates a basic template **/
protected static BytesReference generateTemplateSource(String name) throws IOException {
return jsonBuilder().startObject()
.field("template", name)
.startObject("settings")
.field("index.number_of_shards", 1)
.field("index.number_of_replicas", 1)
.endObject()
.startObject("mappings")
.startObject("_default_")
.startObject("_all")
.field("enabled", false)
.endObject()
.field("date_detection", false)
.startObject("properties")
.startObject("cluster_uuid")
.field("type", "keyword")
.endObject()
.startObject("timestamp")
.field("type", "date")
.field("format", "date_time")
.endObject()
.endObject()
.endObject()
.startObject("cluster_info")
.field("enabled", false)
.endObject()
.startObject("cluster_stats")
.startObject("properties")
.startObject("cluster_stats")
.field("type", "object")
.endObject()
.endObject()
.endObject()
.endObject()
.endObject().bytes();
}
}

View File

@ -6,9 +6,6 @@
package org.elasticsearch.xpack.monitoring.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -46,9 +43,6 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class ExportersTests extends ESTestCase {
@ -200,57 +194,6 @@ public class ExportersTests extends ESTestCase {
assertThat(settings, hasEntry("_name1.foo", "bar"));
}
public void testOpenBulkOnMaster() throws Exception {
Exporter.Factory factory = new MockFactory(false);
Exporter.Factory masterOnlyFactory = new MockFactory(true);
factories.put("mock", factory);
factories.put("mock_master_only", masterOnlyFactory);
Exporters exporters = new Exporters(Settings.builder()
.put("xpack.monitoring.exporters._name0.type", "mock")
.put("xpack.monitoring.exporters._name1.type", "mock_master_only")
.build(), factories, clusterService);
exporters.start();
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(nodes.isLocalNodeElectedMaster()).thenReturn(true);
when(clusterService.state()).thenReturn(ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(nodes).build());
ExportBulk bulk = exporters.openBulk();
assertThat(bulk, notNullValue());
verify(exporters.getExporter("_name0"), times(1)).masterOnly();
verify(exporters.getExporter("_name0"), times(1)).openBulk();
verify(exporters.getExporter("_name1"), times(1)).masterOnly();
verify(exporters.getExporter("_name1"), times(1)).openBulk();
}
public void testExportNotOnMaster() throws Exception {
Exporter.Factory factory = new MockFactory(false);
Exporter.Factory masterOnlyFactory = new MockFactory(true);
factories.put("mock", factory);
factories.put("mock_master_only", masterOnlyFactory);
Exporters exporters = new Exporters(Settings.builder()
.put("xpack.monitoring.exporters._name0.type", "mock")
.put("xpack.monitoring.exporters._name1.type", "mock_master_only")
.build(), factories, clusterService);
exporters.start();
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(nodes.isLocalNodeElectedMaster()).thenReturn(false);
when(clusterService.state()).thenReturn(ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(nodes).build());
ExportBulk bulk = exporters.openBulk();
assertThat(bulk, notNullValue());
verify(exporters.getExporter("_name0"), times(1)).masterOnly();
verify(exporters.getExporter("_name0"), times(1)).openBulk();
verify(exporters.getExporter("_name1"), times(1)).masterOnly();
verify(exporters.getExporter("_name1"), times(1)).isSingleton();
verifyNoMoreInteractions(exporters.getExporter("_name1"));
}
public void testEmptyPipeline() throws IOException {
String json = Exporter.emptyPipeline(XContentType.JSON).string();
@ -360,20 +303,15 @@ public class ExportersTests extends ESTestCase {
static class MockFactory implements Exporter.Factory {
private final boolean masterOnly;
public MockFactory(boolean masterOnly) {
this.masterOnly = masterOnly;
}
@Override
public Exporter create(Exporter.Config config) {
Exporter exporter = mock(Exporter.class);
when(exporter.name()).thenReturn(config.name());
when(exporter.masterOnly()).thenReturn(masterOnly);
when(exporter.openBulk()).thenReturn(mock(ExportBulk.class));
return exporter;
}
}
static class CountingExporter extends Exporter {

View File

@ -144,13 +144,19 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
assertThat(parameters.get("filter_path"), is("$NONE"));
}
private void doCheckWithStatusCode(final PublishableHttpResource resource, final String resourceBasePath, final String resourceName,
final RestStatus status,
final CheckResponse expected)
protected void doCheckWithStatusCode(final PublishableHttpResource resource, final String resourceBasePath, final String resourceName,
final RestStatus status,
final CheckResponse expected)
throws IOException {
final String endpoint = concatenateEndpoint(resourceBasePath, resourceName);
final Response response = response("GET", endpoint, status);
doCheckWithStatusCode(resource, endpoint, expected, response);
}
protected void doCheckWithStatusCode(final PublishableHttpResource resource, final String endpoint, final CheckResponse expected,
final Response response)
throws IOException {
when(client.performRequest("GET", endpoint, resource.getParameters())).thenReturn(response);
assertThat(resource.doCheck(client), is(expected));

View File

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import org.apache.http.HttpEntity;
import org.elasticsearch.client.Response;
import org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.CheckResponse;
import org.apache.http.entity.StringEntity;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests {@link DataTypeMappingHttpResource}.
*/
public class DataTypeMappingHttpResourceTests extends AbstractPublishableHttpResourceTestCase {
private final String typeName = "my_type";
private final DataTypeMappingHttpResource resource = new DataTypeMappingHttpResource(owner, masterTimeout, typeName);
public void testDoCheckTrueFor404() throws IOException {
// if the index is not there, then we don't need to manually add the type
doCheckWithStatusCode(resource, "/" + DATA_INDEX + "/_mapping", typeName, notFoundCheckStatus(), CheckResponse.EXISTS);
}
public void testDoCheckTrue() throws IOException {
final String endpoint = "/" + DATA_INDEX + "/_mapping/" + typeName;
// success does't mean it exists unless the mapping exists! it returns {} if the index exists, but the type does not
final Response response = response("GET", endpoint, successfulCheckStatus());
final HttpEntity responseEntity = mock(HttpEntity.class);
final long validMapping = randomIntBetween(3, Integer.MAX_VALUE);
when(response.getEntity()).thenReturn(responseEntity);
when(responseEntity.getContentLength()).thenReturn(validMapping);
doCheckWithStatusCode(resource, endpoint, CheckResponse.EXISTS, response);
verify(responseEntity).getContentLength();
}
public void testDoCheckFalse() throws IOException {
final String endpoint = "/" + DATA_INDEX + "/_mapping/" + typeName;
// success does't mean it exists unless the mapping exists! it returns {} if the index exists, but the type does not
final Response response = response("GET", endpoint, successfulCheckStatus());
final HttpEntity responseEntity = mock(HttpEntity.class);
final long invalidMapping = randomIntBetween(Integer.MIN_VALUE, 2);
when(response.getEntity()).thenReturn(responseEntity);
when(responseEntity.getContentLength()).thenReturn(invalidMapping);
doCheckWithStatusCode(resource, endpoint, CheckResponse.DOES_NOT_EXIST, response);
verify(responseEntity).getContentLength();
}
public void testDoCheckNullWithException() throws IOException {
assertCheckWithException(resource, "/" + DATA_INDEX + "/_mapping", typeName);
}
public void testDoPublishTrue() throws IOException {
assertPublishSucceeds(resource, "/" + DATA_INDEX + "/_mapping", typeName, StringEntity.class);
}
public void testDoPublishFalse() throws IOException {
assertPublishFails(resource, "/" + DATA_INDEX + "/_mapping", typeName, StringEntity.class);
}
public void testDoPublishFalseWithException() throws IOException {
assertPublishWithException(resource, "/" + DATA_INDEX + "/_mapping", typeName, StringEntity.class);
}
public void testParameters() {
final Map<String, String> parameters = resource.getParameters();
if (masterTimeout != null) {
assertThat(parameters.get("master_timeout"), is(masterTimeout.toString()));
}
assertThat(parameters.size(), is(masterTimeout == null ? 0 : 1));
}
}

View File

@ -46,6 +46,7 @@ import org.junit.Before;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -56,6 +57,7 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX;
import static org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.FILTER_PATH_NONE;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
@ -64,6 +66,7 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
@ -87,13 +90,15 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
public void testExport() throws Exception {
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
final Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
@ -105,11 +110,14 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
bwcIndexesExist, bwcAliasesExist);
assertBulk(webServer, nbDocs);
}
public void testExportWithHeaders() throws Exception {
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
@ -125,7 +133,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
headers.put("Array-Check", array);
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
@ -140,12 +149,16 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist, headers, null);
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
bwcIndexesExist, bwcAliasesExist,
headers, null);
assertBulk(webServer, nbDocs, headers, null);
}
public void testExportWithBasePath() throws Exception {
final boolean useHeaders = randomBoolean();
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
@ -163,7 +176,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false}");
String basePath = "path/to";
@ -196,12 +210,15 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist, headers,
basePath);
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
bwcIndexesExist, bwcAliasesExist,
headers, basePath);
assertBulk(webServer, nbDocs, headers, basePath);
}
public void testHostChangeReChecksTemplate() throws Exception {
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
@ -212,14 +229,16 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer));
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false}");
internalCluster().startNode(builder);
export(Collections.singletonList(newRandomMonitoringDoc()));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
assertBulk(webServer);
try (MockWebServer secondWebServer = createMockWebServer()) {
@ -227,6 +246,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
Settings.builder().putArray("xpack.monitoring.exporters._http.host", getFormattedAddress(secondWebServer))));
enqueueGetClusterVersionResponse(secondWebServer, Version.CURRENT);
enqueueMappingTypeResponses(secondWebServer, !typeMappingsExistAlready);
// pretend that one of the templates is missing
for (Tuple<String, String> template : monitoringTemplates()) {
if (template.v1().contains(MonitoringBulkTimestampedResolver.Data.DATA)) {
@ -245,6 +265,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
export(Collections.singletonList(newRandomMonitoringDoc()));
assertMonitorVersion(secondWebServer);
assertMonitorMappingTypes(secondWebServer, !typeMappingsExistAlready, null, null);
for (Tuple<String, String> template : monitoringTemplates()) {
MockRequest recordedRequest = secondWebServer.takeRequest();
@ -286,6 +307,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
public void testDynamicIndexFormatChange() throws Exception {
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
@ -298,13 +320,17 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
internalCluster().startNode(builder);
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
MonitoringDoc doc = newRandomMonitoringDoc();
export(Collections.singletonList(doc));
assertMonitorResources(webServer, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
bwcIndexesExist, bwcAliasesExist);
MockRequest recordedRequest = assertBulk(webServer);
@SuppressWarnings("unchecked")
@ -321,7 +347,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
.setTransientSettings(Settings.builder().put("xpack.monitoring.exporters._http.index.name.time_format", newTimeFormat)));
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer, true, true, false, false);
enqueueSetupResponses(webServer, true, true, true, false, false);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
doc = newRandomMonitoringDoc();
@ -330,7 +356,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
String expectedMonitoringIndex = ".monitoring-es-" + MonitoringTemplateUtils.TEMPLATE_VERSION + "-"
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.getTimestamp());
assertMonitorResources(webServer, true, true, false, false);
assertMonitorResources(webServer, true, true, true, false, false);
recordedRequest = assertBulk(webServer);
bytes = recordedRequest.getBody().getBytes(StandardCharsets.UTF_8);
@ -357,22 +383,58 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
assertHeaders(request, customHeaders);
}
private void assertMonitorResources(final MockWebServer webServer, final boolean templateAlreadyExists,
final boolean pipelineAlreadyExists, boolean bwcIndexesExist, boolean bwcAliasesExist) throws Exception {
assertMonitorResources(webServer, templateAlreadyExists, pipelineAlreadyExists, bwcIndexesExist, bwcAliasesExist, null, null);
private void assertMonitorResources(final MockWebServer webServer,
final boolean typeMappingsExistAlready,
final boolean templateAlreadyExists, final boolean pipelineAlreadyExists,
final boolean bwcIndexesExist, final boolean bwcAliasesExist) throws Exception {
assertMonitorResources(webServer, typeMappingsExistAlready, templateAlreadyExists, pipelineAlreadyExists,
bwcIndexesExist, bwcAliasesExist, null, null);
}
private void assertMonitorResources(final MockWebServer webServer, final boolean templateAlreadyExists,
final boolean pipelineAlreadyExists, boolean bwcIndexesExist, boolean bwcAliasesExist,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath) throws Exception {
private void assertMonitorResources(final MockWebServer webServer,
final boolean typeMappingsExistAlready,
final boolean templateAlreadyExists, final boolean pipelineAlreadyExists,
boolean bwcIndexesExist, boolean bwcAliasesExist,
@Nullable final Map<String, String[]> customHeaders,
@Nullable final String basePath) throws Exception {
assertMonitorVersion(webServer, customHeaders, basePath);
assertMonitorMappingTypes(webServer, typeMappingsExistAlready, customHeaders, basePath);
assertMonitorTemplates(webServer, templateAlreadyExists, customHeaders, basePath);
assertMonitorPipelines(webServer, pipelineAlreadyExists, customHeaders, basePath);
assertMonitorBackwardsCompatibilityAliases(webServer, bwcIndexesExist && false == bwcAliasesExist, customHeaders, basePath);
}
private void assertMonitorTemplates(final MockWebServer webServer, final boolean alreadyExists,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath) throws Exception {
private void assertMonitorMappingTypes(final MockWebServer webServer,
final boolean alreadyExists,
@Nullable final Map<String, String[]> customHeaders,
@Nullable final String basePath) throws Exception {
final String pathPrefix = basePathToAssertablePrefix(basePath);
MockRequest request;
for (String type : monitoringProductTypes()) {
request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("GET"));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/" + DATA_INDEX + "/_mapping/" + type));
assertThat(request.getUri().getQuery(), nullValue());
assertHeaders(request, customHeaders);
if (alreadyExists == false) {
request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("PUT"));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/" + DATA_INDEX + "/_mapping/" + type));
assertThat(request.getUri().getQuery(), nullValue());
assertThat(request.getBody(), equalTo("{\"enabled\":false}"));
assertHeaders(request, customHeaders);
}
}
}
private void assertMonitorTemplates(final MockWebServer webServer,
final boolean alreadyExists,
@Nullable final Map<String, String[]> customHeaders,
@Nullable final String basePath) throws Exception {
final String pathPrefix = basePathToAssertablePrefix(basePath);
MockRequest request;
@ -524,6 +586,10 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
return docs;
}
private List<String> monitoringProductTypes() {
return Arrays.asList("kibana");
}
private String basePathToAssertablePrefix(@Nullable String basePath) {
if (basePath == null) {
return "";
@ -550,13 +616,41 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
.field("number", v.toString()).endObject().endObject().bytes().utf8ToString()));
}
private void enqueueSetupResponses(MockWebServer webServer, boolean templatesAlreadyExists, boolean pipelineAlreadyExists,
boolean bwcIndexesExist, boolean bwcAliasesExist) throws IOException {
private void enqueueSetupResponses(MockWebServer webServer,
boolean typeMappingsAlreadyExist,
boolean templatesAlreadyExists, boolean pipelineAlreadyExists,
boolean bwcIndexesExist, boolean bwcAliasesExist) throws IOException {
enqueueMappingTypeResponses(webServer, typeMappingsAlreadyExist);
enqueueTemplateResponses(webServer, templatesAlreadyExists);
enqueuePipelineResponses(webServer, pipelineAlreadyExists);
enqueueBackwardsCompatibilityAliasResponse(webServer, bwcIndexesExist, bwcAliasesExist);
}
private void enqueueMappingTypeResponses(final MockWebServer webServer, final boolean alreadyExists) throws IOException {
if (alreadyExists) {
enqueueMappingTypeResponsesExistsAlreadyOrWillBeCreated(webServer);
} else {
enqueueMappingTypeResponsesDoesNotExistYet(webServer);
}
}
private void enqueueMappingTypeResponsesDoesNotExistYet(final MockWebServer webServer) throws IOException {
for (String type : monitoringProductTypes()) {
enqueueResponse(webServer, 200, "{}");
enqueueResponse(webServer, 200, "type [" + type + "] created");
}
}
private void enqueueMappingTypeResponsesExistsAlreadyOrWillBeCreated(final MockWebServer webServer) throws IOException {
for (final String type : monitoringProductTypes()) {
if (randomBoolean()) {
enqueueResponse(webServer, 200, "{\".monitoring-data-2\":{\"" + type + "\":{\"enabled\":false}}}");
} else {
enqueueResponse(webServer, 404, "index does not exist; template will create it");
}
}
}
private void enqueueTemplateResponses(final MockWebServer webServer, final boolean alreadyExists) throws IOException {
if (alreadyExists) {
enqueueTemplateResponsesExistsAlready(webServer);

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMapOf;
import static org.mockito.Matchers.eq;
@ -37,6 +38,10 @@ import static org.mockito.Mockito.when;
*/
public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTestCase {
/**
* kibana, logstash, beats (This PR only really covers kibana, but a follow-on will add logstash and eventually we'll support beats)
*/
private final int EXPECTED_TYPES = 1;
private final int EXPECTED_TEMPLATES = 4;
private final RestClient client = mock(RestClient.class);
@ -60,13 +65,118 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
verifyNoMoreInteractions(client);
}
public void testTemplateCheckBlocksAfterSuccessfulVersion() throws IOException {
// COMMENTED OUT CODE WILL BE USED IN FOLLOW-UP PR FOR "logstash" type
public void testTypeMappingCheckBlocksAfterSuccessfulVersion() throws IOException {
final Exception exception = failureGetException();
// final boolean firstSucceeds = randomBoolean();
int expectedGets = 1;
int expectedPuts = 0;
whenValidVersionResponse();
// failure in the middle of various templates being checked/published; suggests a node dropped
// if (firstSucceeds) {
// final boolean successfulFirst = randomBoolean();
// // -2 from one success + a necessary failure after it!
// final int extraPasses = Math.max(randomIntBetween(0, EXPECTED_TYPES - 2), 0);
// final int successful = randomIntBetween(0, extraPasses);
// final int unsuccessful = extraPasses - successful;
//
// final Response first = successfulFirst ? successfulGetTypeMappingResponse() : unsuccessfulGetTypeMappingResponse();
//
// final List<Response> otherResponses = getTypeMappingResponses(successful, unsuccessful);
//
// // last check fails implies that N - 2 publishes succeeded!
// when(client.performRequest(eq("GET"), startsWith("/" + DATA_INDEX + "/_mapping/"), anyMapOf(String.class, String.class)))
// .thenReturn(first, otherResponses.toArray(new Response[otherResponses.size()]))
// .thenThrow(exception);
// whenSuccessfulPutTypeMappings(otherResponses.size() + 1);
//
// expectedGets += 1 + successful + unsuccessful;
// expectedPuts = (successfulFirst ? 0 : 1) + unsuccessful;
// } else {
when(client.performRequest(eq("GET"), startsWith("/" + DATA_INDEX + "/_mapping/"), anyMapOf(String.class, String.class)))
.thenThrow(exception);
// }
assertTrue(resources.isDirty());
assertFalse(resources.checkAndPublish(client));
// ensure it didn't magically become not-dirty
assertTrue(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(expectedGets);
verifyPutTypeMappings(expectedPuts);
verifyNoMoreInteractions(client);
}
// COMMENTED OUT CODE WILL BE USED IN FOLLOW-UP PR FOR "logstash" type
public void testTypeMappingPublishBlocksAfterSuccessfulVersion() throws IOException {
final Exception exception = failurePutException();
// final boolean firstSucceeds = randomBoolean();
int expectedGets = 1;
int expectedPuts = 1;
whenValidVersionResponse();
// failure in the middle of various templates being checked/published; suggests a node dropped
// if (firstSucceeds) {
// final Response firstSuccess = successfulPutResponse();
// // -2 from one success + a necessary failure after it!
// final int extraPasses = randomIntBetween(0, EXPECTED_TYPES - 2);
// final int successful = randomIntBetween(0, extraPasses);
// final int unsuccessful = extraPasses - successful;
//
// final List<Response> otherResponses = successfulPutResponses(unsuccessful);
//
// // first one passes for sure, so we need an extra "unsuccessful" GET
// whenGetTypeMappingResponse(successful, unsuccessful + 2);
//
// // previous publishes must have succeeded
// when(client.performRequest(eq("PUT"),
// startsWith("/" + DATA_INDEX + "/_mapping/"),
// anyMapOf(String.class, String.class),
// any(HttpEntity.class)))
// .thenReturn(firstSuccess, otherResponses.toArray(new Response[otherResponses.size()]))
// .thenThrow(exception);
//
// // GETs required for each PUT attempt (first is guaranteed "unsuccessful")
// expectedGets += successful + unsuccessful + 1;
// // unsuccessful are PUT attempts + the guaranteed successful PUT (first)
// expectedPuts += unsuccessful + 1;
// } else {
// fail the check so that it has to attempt the PUT
whenGetTypeMappingResponse(0, 1);
when(client.performRequest(eq("PUT"),
startsWith("/" + DATA_INDEX + "/_mapping/"),
anyMapOf(String.class, String.class),
any(HttpEntity.class)))
.thenThrow(exception);
// }
assertTrue(resources.isDirty());
assertFalse(resources.checkAndPublish(client));
// ensure it didn't magically become not-dirty
assertTrue(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(expectedGets);
verifyPutTypeMappings(expectedPuts);
verifyNoMoreInteractions(client);
}
public void testTemplateCheckBlocksAfterSuccessfulTypeMapping() throws IOException {
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final Exception exception = failureGetException();
final boolean firstSucceeds = randomBoolean();
int expectedGets = 1;
int expectedPuts = 0;
whenValidVersionResponse();
whenGetTypeMappingResponse(successfulGetTypeMappings, unsuccessfulGetTypeMappings);
whenSuccessfulPutTypeMappings(EXPECTED_TYPES);
// failure in the middle of various templates being checked/published; suggests a node dropped
if (firstSucceeds) {
@ -95,22 +205,28 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
assertTrue(resources.isDirty());
assertFalse(resources.checkAndPublish(client));
// ensure it didn't magically become
// ensure it didn't magically become not-dirty
assertTrue(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(EXPECTED_TYPES);
verifyPutTypeMappings(unsuccessfulGetTypeMappings);
verifyGetTemplates(expectedGets);
verifyPutTemplates(expectedPuts);
verifyNoMoreInteractions(client);
}
public void testTemplatePublishBlocksAfterSuccessfulVersion() throws IOException {
public void testTemplatePublishBlocksAfterSuccessfulTypeMapping() throws IOException {
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final Exception exception = failurePutException();
final boolean firstSucceeds = randomBoolean();
int expectedGets = 1;
int expectedPuts = 1;
whenValidVersionResponse();
whenGetTypeMappingResponse(successfulGetTypeMappings, unsuccessfulGetTypeMappings);
whenSuccessfulPutTypeMappings(EXPECTED_TYPES);
// failure in the middle of various templates being checked/published; suggests a node dropped
if (firstSucceeds) {
@ -144,21 +260,27 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
assertTrue(resources.isDirty());
assertFalse(resources.checkAndPublish(client));
// ensure it didn't magically become
// ensure it didn't magically become not-dirty
assertTrue(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(EXPECTED_TYPES);
verifyPutTypeMappings(unsuccessfulGetTypeMappings);
verifyGetTemplates(expectedGets);
verifyPutTemplates(expectedPuts);
verifyNoMoreInteractions(client);
}
public void testPipelineCheckBlocksAfterSuccessfulTemplates() throws IOException {
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES);
final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates;
final Exception exception = failureGetException();
whenValidVersionResponse();
whenGetTypeMappingResponse(successfulGetTypeMappings, unsuccessfulGetTypeMappings);
whenSuccessfulPutTypeMappings(EXPECTED_TYPES);
whenGetTemplates(successfulGetTemplates, unsuccessfulGetTemplates);
whenSuccessfulPutTemplates(EXPECTED_TEMPLATES);
@ -168,10 +290,12 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
assertTrue(resources.isDirty());
assertFalse(resources.checkAndPublish(client));
// ensure it didn't magically become
// ensure it didn't magically become not-dirty
assertTrue(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(EXPECTED_TYPES);
verifyPutTypeMappings(unsuccessfulGetTypeMappings);
verifyGetTemplates(EXPECTED_TEMPLATES);
verifyPutTemplates(unsuccessfulGetTemplates);
verifyGetPipelines(1);
@ -180,11 +304,15 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
}
public void testPipelinePublishBlocksAfterSuccessfulTemplates() throws IOException {
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES);
final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates;
final Exception exception = failurePutException();
whenValidVersionResponse();
whenGetTypeMappingResponse(successfulGetTypeMappings, unsuccessfulGetTypeMappings);
whenSuccessfulPutTypeMappings(EXPECTED_TYPES);
whenGetTemplates(successfulGetTemplates, unsuccessfulGetTemplates);
whenSuccessfulPutTemplates(EXPECTED_TEMPLATES);
// pipeline can't be there
@ -199,10 +327,12 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
assertTrue(resources.isDirty());
assertFalse(resources.checkAndPublish(client));
// ensure it didn't magically become
// ensure it didn't magically become not-dirty
assertTrue(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(EXPECTED_TYPES);
verifyPutTypeMappings(unsuccessfulGetTypeMappings);
verifyGetTemplates(EXPECTED_TEMPLATES);
verifyPutTemplates(unsuccessfulGetTemplates);
verifyGetPipelines(1);
@ -211,12 +341,16 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
}
public void testSuccessfulChecks() throws IOException {
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES);
final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates;
final int successfulGetPipelines = randomIntBetween(0, 1);
final int unsuccessfulGetPipelines = 1 - successfulGetPipelines;
whenValidVersionResponse();
whenGetTypeMappingResponse(successfulGetTypeMappings, unsuccessfulGetTypeMappings);
whenSuccessfulPutTypeMappings(EXPECTED_TYPES);
whenGetTemplates(successfulGetTemplates, unsuccessfulGetTemplates);
whenSuccessfulPutTemplates(unsuccessfulGetTemplates);
whenGetPipelines(successfulGetPipelines, unsuccessfulGetPipelines);
@ -230,6 +364,8 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
assertFalse(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(EXPECTED_TYPES);
verifyPutTypeMappings(unsuccessfulGetTypeMappings);
verifyGetTemplates(EXPECTED_TEMPLATES);
verifyPutTemplates(unsuccessfulGetTemplates);
verifyGetPipelines(1);
@ -258,8 +394,50 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
return response("GET", "/_get_something", notFoundCheckStatus());
}
private Response successfulGetTypeMappingResponse() {
final Response response;
if (randomBoolean()) {
// it returned 200, but we also need it to contain _something_ in the JSON {...}
final HttpEntity entity = new StringEntity("{\"" + DATA_INDEX + "\":{}}", ContentType.APPLICATION_JSON);
response = successfulGetResponse();
when(response.getEntity()).thenReturn(entity);
} else {
// simulates the index does not exist
response = unsuccessfulGetResponse();
}
return response;
}
private Response unsuccessfulGetTypeMappingResponse() {
// "unsuccessful" for type mappings is a response code 200, but the response is literally "{}"
final Response response = successfulGetResponse();
final HttpEntity entity = new StringEntity("{}", ContentType.APPLICATION_JSON);
when(response.getEntity()).thenReturn(entity);
return response;
}
private List<Response> getTypeMappingResponses(final int successful, final int unsuccessful) {
final List<Response> responses = new ArrayList<>(successful + unsuccessful);
for (int i = 0; i < successful; ++i) {
responses.add(successfulGetTypeMappingResponse());
}
for (int i = 0; i < unsuccessful; ++i) {
responses.add(unsuccessfulGetTypeMappingResponse());
}
return responses;
}
private List<Response> getResponses(final int successful, final int unsuccessful) {
final List<Response> responses = new ArrayList<>(successful);
final List<Response> responses = new ArrayList<>(successful + unsuccessful);
for (int i = 0; i < successful; ++i) {
responses.add(successfulGetResponse());
@ -299,6 +477,35 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
when(client.performRequest(eq("GET"), eq("/"), anyMapOf(String.class, String.class))).thenReturn(versionResponse);
}
private void whenGetTypeMappingResponse(final int successful, final int unsuccessful) throws IOException {
final List<Response> gets = getTypeMappingResponses(successful, unsuccessful);
if (gets.size() == 1) {
when(client.performRequest(eq("GET"), startsWith("/" + DATA_INDEX + "/_mapping"), anyMapOf(String.class, String.class)))
.thenReturn(gets.get(0));
} else {
when(client.performRequest(eq("GET"), startsWith("/" + DATA_INDEX + "/_mapping"), anyMapOf(String.class, String.class)))
.thenReturn(gets.get(0), gets.subList(1, gets.size()).toArray(new Response[gets.size() - 1]));
}
}
private void whenSuccessfulPutTypeMappings(final int successful) throws IOException {
final List<Response> successfulPuts = successfulPutResponses(successful);
// empty is possible if they all exist
if (successful == 1) {
when(client.performRequest(eq("PUT"),
startsWith("/" + DATA_INDEX + "/_mapping"),
anyMapOf(String.class, String.class), any(HttpEntity.class)))
.thenReturn(successfulPuts.get(0));
} else if (successful > 1) {
when(client.performRequest(eq("PUT"),
startsWith("/" + DATA_INDEX + "/_mapping"),
anyMapOf(String.class, String.class), any(HttpEntity.class)))
.thenReturn(successfulPuts.get(0), successfulPuts.subList(1, successful).toArray(new Response[successful - 1]));
}
}
private void whenGetTemplates(final int successful, final int unsuccessful) throws IOException {
final List<Response> gets = getResponses(successful, unsuccessful);
@ -375,6 +582,18 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
verify(client).performRequest(eq("GET"), eq("/"), anyMapOf(String.class, String.class));
}
private void verifyGetTypeMappings(final int called) throws IOException {
verify(client, times(called))
.performRequest(eq("GET"), startsWith("/" + DATA_INDEX + "/_mapping"), anyMapOf(String.class, String.class));
}
private void verifyPutTypeMappings(final int called) throws IOException {
verify(client, times(called)).performRequest(eq("PUT"), // method
startsWith("/" + DATA_INDEX + "/_mapping"), // endpoint
anyMapOf(String.class, String.class), // parameters (e.g., timeout)
any(HttpEntity.class)); // raw template
}
private void verifyGetTemplates(final int called) throws IOException {
verify(client, times(called)).performRequest(eq("GET"), startsWith("/_template/"), anyMapOf(String.class, String.class));
}

View File

@ -284,6 +284,10 @@ public class HttpExporterTests extends ESTestCase {
final List<HttpResource> resources = multiResource.getResources();
final int version = (int)resources.stream().filter((resource) -> resource instanceof VersionHttpResource).count();
final List<DataTypeMappingHttpResource> typeMappings =
resources.stream().filter((resource) -> resource instanceof DataTypeMappingHttpResource)
.map(DataTypeMappingHttpResource.class::cast)
.collect(Collectors.toList());
final List<TemplateHttpResource> templates =
resources.stream().filter((resource) -> resource instanceof TemplateHttpResource)
.map(TemplateHttpResource.class::cast)
@ -298,8 +302,10 @@ public class HttpExporterTests extends ESTestCase {
.collect(Collectors.toList());
// expected number of resources
assertThat(multiResource.getResources().size(), equalTo(version + templates.size() + pipelines.size() + bwc.size()));
assertThat(multiResource.getResources().size(),
equalTo(version + typeMappings.size() + templates.size() + pipelines.size() + bwc.size()));
assertThat(version, equalTo(1));
assertThat(typeMappings, hasSize(1));
assertThat(templates, hasSize(4));
assertThat(pipelines, hasSize(useIngest ? 1 : 0));
assertThat(bwc, hasSize(1));

View File

@ -9,7 +9,9 @@ import org.apache.http.HttpEntity;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.SuppressLoggerChecks;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.CheckResponse;
@ -58,8 +60,7 @@ public class PublishableHttpResourceTests extends AbstractPublishableHttpResourc
when(client.performRequest("GET", endpoint, resource.getParameters())).thenReturn(response);
assertThat(resource.checkForResource(client, logger, resourceBasePath, resourceName, resourceType, owner, ownerType),
is(CheckResponse.ERROR));
sometimesAssertSimpleCheckForResource(client, logger, resourceBasePath, resourceName, resourceType, CheckResponse.ERROR, response);
verify(logger).trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, owner, ownerType);
verify(client).performRequest("GET", endpoint, resource.getParameters());
@ -73,11 +74,11 @@ public class PublishableHttpResourceTests extends AbstractPublishableHttpResourc
final RestStatus failedStatus = failedCheckStatus();
final ResponseException responseException = responseException("GET", endpoint, failedStatus);
final Exception e = randomFrom(new IOException("expected"), new RuntimeException("expected"), responseException);
final Response response = e == responseException ? responseException.getResponse() : null;
when(client.performRequest("GET", endpoint, resource.getParameters())).thenThrow(e);
assertThat(resource.checkForResource(client, logger, resourceBasePath, resourceName, resourceType, owner, ownerType),
is(CheckResponse.ERROR));
sometimesAssertSimpleCheckForResource(client, logger, resourceBasePath, resourceName, resourceType, CheckResponse.ERROR, response);
verify(logger).trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, owner, ownerType);
verify(client).performRequest("GET", endpoint, resource.getParameters());
@ -139,8 +140,7 @@ public class PublishableHttpResourceTests extends AbstractPublishableHttpResourc
when(client.performRequest("GET", endpoint, resource.getParameters())).thenReturn(response);
assertThat(resource.checkForResource(client, logger, resourceBasePath, resourceName, resourceType, owner, ownerType),
is(expected));
sometimesAssertSimpleCheckForResource(client, logger, resourceBasePath, resourceName, resourceType, expected, response);
verify(logger).trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, owner, ownerType);
verify(client).performRequest("GET", endpoint, resource.getParameters());
@ -188,4 +188,21 @@ public class PublishableHttpResourceTests extends AbstractPublishableHttpResourc
verifyNoMoreInteractions(client, response, logger, entity);
}
private void sometimesAssertSimpleCheckForResource(final RestClient client, final Logger logger,
final String resourceBasePath,
final String resourceName, final String resourceType,
final CheckResponse expected, final Response response) {
// sometimes use the simple check
if (randomBoolean()) {
assertThat(resource.simpleCheckForResource(client, logger, resourceBasePath, resourceName, resourceType, owner, ownerType),
is(expected));
} else {
final Tuple<CheckResponse, Response> responseTuple =
resource.checkForResource(client, logger, resourceBasePath, resourceName, resourceType, owner, ownerType);
assertThat(responseTuple.v1(), is(expected));
assertThat(responseTuple.v2(), is(response));
}
}
}

View File

@ -5,58 +5,230 @@
*/
package org.elasticsearch.xpack.monitoring.exporter.local;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.xpack.monitoring.exporter.AbstractExporterTemplateTestCase;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.Collections;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.notNullValue;
public class LocalExporterTemplateTests extends AbstractExporterTemplateTestCase {
@ESIntegTestCase.ClusterScope(scope = TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class LocalExporterTemplateTests extends MonitoringIntegTestCase {
private final Settings localExporter = Settings.builder().put("type", LocalExporter.TYPE).build();
@Override
protected Settings exporterSettings() {
return Settings.builder().put("type", LocalExporter.TYPE).build();
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._exporter.type", LocalExporter.TYPE);
return settings.build();
}
@Override
protected void deleteTemplates() throws Exception {
waitNoPendingTasksOnAll();
cluster().wipeAllTemplates(Collections.emptySet());
public void testCreateWhenExistingTemplatesAreOld() throws Exception {
internalCluster().startNode();
// put an old variant of the monitoring-data-# index so that types need to be added
final CreateIndexRequest request = new CreateIndexRequest(MonitoringTemplateUtils.DATA_INDEX);
request.settings(Settings.builder().put("index.mapper.dynamic", false).build());
// notably absent are: kibana, logstash, and beats
request.mapping("cluster_info", "{\"enabled\": false}");
request.mapping("node", "{\"enabled\": false}");
request.mapping("fake", "{\"enabled\": false}");
client().admin().indices().create(request).actionGet();
putTemplate(indexTemplateName());
putTemplate(dataTemplateName());
putPipeline(Exporter.EXPORT_PIPELINE_NAME);
doExporting();
logger.debug("--> existing templates are old");
assertTemplateExists(dataTemplateName());
assertTemplateExists(indexTemplateName());
logger.debug("--> existing templates are old: new templates should be created");
for (String template : monitoringTemplateNames()) {
assertTemplateExists(template);
}
assertPipelineExists(Exporter.EXPORT_PIPELINE_NAME);
doExporting();
logger.debug("--> indices should have been created");
awaitIndexExists(currentDataIndexName());
assertIndicesExists(currentTimestampedIndexName());
// ensure that it added mapping types to monitoring-data-2, without throwing away the index
assertMapping(MonitoringTemplateUtils.DATA_INDEX, "fake");
for (final String type : MonitoringTemplateUtils.NEW_DATA_TYPES) {
assertMapping(MonitoringTemplateUtils.DATA_INDEX, type);
}
}
@Override
protected void deletePipeline() throws Exception {
waitNoPendingTasksOnAll();
cluster().client().admin().cluster().deletePipeline(new DeletePipelineRequest(Exporter.EXPORT_PIPELINE_NAME));
private void assertMapping(final String index, final String type) throws Exception {
GetMappingsResponse response = client().admin().indices().prepareGetMappings(index).setTypes(type).get();
ImmutableOpenMap<String, MappingMetaData> mappings = response.getMappings().get(index);
assertThat(mappings, notNullValue());
MappingMetaData mappingMetaData = mappings.get(type);
assertThat(mappingMetaData, notNullValue());
}
@Override
protected void putTemplate(String name) throws Exception {
public void testCreateWhenExistingTemplateAreUpToDate() throws Exception {
internalCluster().startNode();
putTemplate(indexTemplateName());
putTemplate(dataTemplateName());
putPipeline(Exporter.EXPORT_PIPELINE_NAME);
doExporting();
logger.debug("--> existing templates are up to date");
for (String template : monitoringTemplateNames()) {
assertTemplateExists(template);
}
assertPipelineExists(Exporter.EXPORT_PIPELINE_NAME);
logger.debug("--> existing templates has the same version: they should not be changed");
assertTemplateNotUpdated(indexTemplateName());
assertTemplateNotUpdated(dataTemplateName());
assertPipelineNotUpdated(Exporter.EXPORT_PIPELINE_NAME);
doExporting();
logger.debug("--> indices should have been created");
awaitIndexExists(currentDataIndexName());
awaitIndexExists(currentTimestampedIndexName());
}
protected void doExporting() throws Exception {
// TODO: these should be unit tests, not using guice (copied from now-deleted AbstractExporterTemplateTestCase)
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
XPackLicenseState licenseState = internalCluster().getInstance(XPackLicenseState.class);
LicenseService licenseService = internalCluster().getInstance(LicenseService.class);
InternalClient client = internalCluster().getInstance(InternalClient.class);
Collector collector = new ClusterStatsCollector(clusterService.getSettings(), clusterService,
new MonitoringSettings(clusterService.getSettings(), clusterService.getClusterSettings()),
licenseState, client, licenseService);
Exporters exporters = internalCluster().getInstance(Exporters.class);
assertNotNull(exporters);
Exporter exporter = exporters.getExporter("_exporter");
// Wait for exporting bulks to be ready to export
Runnable busy = () -> assertThat(exporter.openBulk(), notNullValue());
assertBusy(busy);
exporters.export(collector.collect());
}
private String dataTemplateName() {
MockDataIndexNameResolver resolver = new MockDataIndexNameResolver(MonitoringTemplateUtils.TEMPLATE_VERSION);
return resolver.templateName();
}
private String indexTemplateName() {
MockTimestampedIndexNameResolver resolver =
new MockTimestampedIndexNameResolver(MonitoredSystem.ES, localExporter, MonitoringTemplateUtils.TEMPLATE_VERSION);
return resolver.templateName();
}
private String currentDataIndexName() {
return MonitoringTemplateUtils.DATA_INDEX;
}
private String currentTimestampedIndexName() {
MonitoringDoc doc = new MonitoringDoc(MonitoredSystem.ES.getSystem(), Version.CURRENT.toString());
doc.setTimestamp(System.currentTimeMillis());
MockTimestampedIndexNameResolver resolver =
new MockTimestampedIndexNameResolver(MonitoredSystem.ES, localExporter, MonitoringTemplateUtils.TEMPLATE_VERSION);
return resolver.index(doc);
}
/** Generates a basic template **/
private BytesReference generateTemplateSource(String name) throws IOException {
return jsonBuilder().startObject()
.field("template", name)
.startObject("settings")
.field("index.number_of_shards", 1)
.field("index.number_of_replicas", 1)
.endObject()
.startObject("mappings")
.startObject("_default_")
.startObject("_all")
.field("enabled", false)
.endObject()
.field("date_detection", false)
.startObject("properties")
.startObject("cluster_uuid")
.field("type", "keyword")
.endObject()
.startObject("timestamp")
.field("type", "date")
.field("format", "date_time")
.endObject()
.endObject()
.endObject()
.startObject("cluster_info")
.field("enabled", false)
.endObject()
.startObject("cluster_stats")
.startObject("properties")
.startObject("cluster_stats")
.field("type", "object")
.endObject()
.endObject()
.endObject()
.endObject()
.endObject().bytes();
}
private void putTemplate(String name) throws Exception {
waitNoPendingTasksOnAll();
assertAcked(client().admin().indices().preparePutTemplate(name).setSource(generateTemplateSource(name)).get());
}
@Override
protected void putPipeline(String name) throws Exception {
private void putPipeline(String name) throws Exception {
waitNoPendingTasksOnAll();
assertAcked(client().admin().cluster().preparePutPipeline(name, Exporter.emptyPipeline(XContentType.JSON).bytes()).get());
}
@Override
protected void assertTemplateExists(String name) throws Exception {
private void assertTemplateExists(String name) throws Exception {
waitNoPendingTasksOnAll();
waitForMonitoringTemplate(name);
}
@Override
protected void assertPipelineExists(String name) throws Exception {
private void assertPipelineExists(String name) throws Exception {
waitNoPendingTasksOnAll();
assertPipelineInstalled(name);
}
@ -73,21 +245,14 @@ public class LocalExporterTemplateTests extends AbstractExporterTemplateTestCase
}, 60, TimeUnit.SECONDS);
}
@Override
protected void assertTemplateNotUpdated(String name) throws Exception {
private void assertTemplateNotUpdated(String name) throws Exception {
waitNoPendingTasksOnAll();
assertTemplateExists(name);
}
@Override
protected void assertPipelineNotUpdated(String name) throws Exception {
private void assertPipelineNotUpdated(String name) throws Exception {
waitNoPendingTasksOnAll();
assertPipelineExists(name);
}
@AwaitsFix(bugUrl = "testing locally to determine why this is a race condition on Jenkins")
@Override
public void testCreateWhenNoExistingTemplates() throws Exception {
}
}