[Monitoring] Use Exporter to create watches in Monitoring Cluster (elastic/x-pack-elasticsearch#994)

By creating the watches via the exporter, we get to afford ourselves
with a much more automatic and simpler set of security permissions.

This does limit us in a few ways (e.g., every exporter has to deal with
cluster alerts itself, which means that newer releases of Kibana cannot
help by adding newer cluster alerts for older, still-monitored
clusters).

Original commit: elastic/x-pack-elasticsearch@448ef313c3
This commit is contained in:
Chris Earle 2017-04-18 12:59:46 -04:00 committed by GitHub
parent a11f77aea0
commit c27bb16141
36 changed files with 2647 additions and 193 deletions

View File

@ -195,6 +195,11 @@ public class XPackLicenseState {
listeners.add(Objects.requireNonNull(runnable));
}
/** Remove a listener */
public void removeListener(Runnable runnable) {
listeners.remove(runnable);
}
/** Return the current license type. */
public OperationMode getOperationMode() {
return status.mode;
@ -326,12 +331,22 @@ public class XPackLicenseState {
/**
* Monitoring is always available as long as there is a valid license
*
* @return true
* @return true if the license is active
*/
public boolean isMonitoringAllowed() {
return status.active;
}
/**
* Monitoring Cluster Alerts requires the equivalent license to use Watcher.
*
* @return {@link #isWatcherAllowed()}
* @see #isWatcherAllowed()
*/
public boolean isMonitoringClusterAlertsAllowed() {
return isWatcherAllowed();
}
/**
* Determine if the current license allows the retention of indices to be modified.
* <p>

View File

@ -114,8 +114,8 @@ public class Monitoring implements ActionPlugin {
final SSLService dynamicSSLService = sslService.createDynamicSSLService();
Map<String, Exporter.Factory> exporterFactories = new HashMap<>();
exporterFactories.put(HttpExporter.TYPE, config -> new HttpExporter(config, dynamicSSLService, threadPool.getThreadContext()));
exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService, cleanerService));
final Exporters exporters = new Exporters(settings, exporterFactories, clusterService, threadPool.getThreadContext());
exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, cleanerService));
final Exporters exporters = new Exporters(settings, exporterFactories, clusterService, licenseState, threadPool.getThreadContext());
Set<Collector> collectors = new HashSet<>();
collectors.add(new IndicesStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));

View File

@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.Streams;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Pattern;
/**
* {@code ClusterAlertsUtil} provides static methods to easily load the JSON resources that
* represent watches for Cluster Alerts.
*/
public class ClusterAlertsUtil {
/**
* The name of the Watch resource when substituted by the high-level watch ID.
*/
private static final String WATCH_FILE = "/monitoring/watches/%s.json";
/**
* Replace the <code>${monitoring.watch.cluster_uuid}</code> field in the watches.
*/
private static final Pattern CLUSTER_UUID_PROPERTY =
Pattern.compile(Pattern.quote("${monitoring.watch.cluster_uuid}"));
/**
* Replace the <code>${monitoring.watch.id}</code> field in the watches.
*/
private static final Pattern WATCH_ID_PROPERTY =
Pattern.compile(Pattern.quote("${monitoring.watch.id}"));
/**
* Replace the <code>${monitoring.watch.unique_id}</code> field in the watches.
*
* @see #createUniqueWatchId(ClusterService, String)
*/
private static final Pattern UNIQUE_WATCH_ID_PROPERTY =
Pattern.compile(Pattern.quote("${monitoring.watch.unique_id}"));
/**
* An unsorted list of Watch IDs representing resource files for Monitoring Cluster Alerts.
*/
public static final String[] WATCH_IDS = {
"elasticsearch_cluster_status",
"elasticsearch_version_mismatch",
"kibana_version_mismatch",
"logstash_version_mismatch"
};
/**
* Create a unique identifier for the watch and cluster.
*
* @param clusterService The cluster service used to fetch the latest cluster state.
* @param watchId The watch's ID.
* @return Never {@code null}.
* @see #WATCH_IDS
*/
public static String createUniqueWatchId(final ClusterService clusterService, final String watchId) {
return createUniqueWatchId(clusterService.state().metaData().clusterUUID(), watchId);
}
/**
* Create a unique identifier for the watch and cluster.
*
* @param clusterUuid The cluster's UUID.
* @param watchId The watch's ID.
* @return Never {@code null}.
* @see #WATCH_IDS
*/
private static String createUniqueWatchId(final String clusterUuid, final String watchId) {
return clusterUuid + "_" + watchId;
}
/**
* Create a unique watch ID and load the {@code watchId} resource by replacing variables,
* such as the cluster's UUID.
*
* @param clusterService The cluster service used to fetch the latest cluster state.
* @param watchId The watch's ID.
* @return Never {@code null}. The key is the unique watch ID. The value is the Watch source.
* @throws RuntimeException if the watch does not exist
*/
public static String loadWatch(final ClusterService clusterService, final String watchId) {
final String resource = String.format(Locale.ROOT, WATCH_FILE, watchId);
try {
final String clusterUuid = clusterService.state().metaData().clusterUUID();
final String uniqueWatchId = createUniqueWatchId(clusterUuid, watchId);
// load the resource as-is
String source = loadResource(resource).utf8ToString();
source = CLUSTER_UUID_PROPERTY.matcher(source).replaceAll(clusterUuid);
source = WATCH_ID_PROPERTY.matcher(source).replaceAll(watchId);
source = UNIQUE_WATCH_ID_PROPERTY.matcher(source).replaceAll(uniqueWatchId);
return source;
} catch (final IOException e) {
throw new RuntimeException("Unable to load Watch [" + watchId + "]", e);
}
}
private static BytesReference loadResource(final String resource) throws IOException {
try (InputStream is = ClusterAlertsUtil.class.getResourceAsStream(resource)) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
Streams.copy(is, out);
return new BytesArray(out.toByteArray());
}
}
}
}

View File

@ -5,9 +5,11 @@
*/
package org.elasticsearch.xpack.monitoring.exporter;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import java.io.IOException;
@ -27,6 +29,10 @@ public abstract class Exporter implements AutoCloseable {
* Note: disabling it obviously loses any benefit of using it, but it does allow clusters that don't run with ingest to not use it.
*/
public static final String USE_INGEST_PIPELINE_SETTING = "use_ingest";
/**
* Every {@code Exporter} allows users to explicitly disable cluster alerts.
*/
public static final String CLUSTER_ALERTS_MANAGEMENT_SETTING = "cluster_alerts.management.enabled";
protected final Config config;
@ -109,12 +115,19 @@ public abstract class Exporter implements AutoCloseable {
private final String name;
private final String type;
private final boolean enabled;
private final Settings globalSettings;
private final Settings settings;
private final ClusterService clusterService;
private final XPackLicenseState licenseState;
public Config(String name, String type, Settings settings) {
public Config(String name, String type, Settings globalSettings, Settings settings,
ClusterService clusterService, XPackLicenseState licenseState) {
this.name = name;
this.type = type;
this.globalSettings = globalSettings;
this.settings = settings;
this.clusterService = clusterService;
this.licenseState = licenseState;
this.enabled = settings.getAsBoolean("enabled", true);
}
@ -130,10 +143,22 @@ public abstract class Exporter implements AutoCloseable {
return enabled;
}
public Settings globalSettings() {
return globalSettings;
}
public Settings settings() {
return settings;
}
public ClusterService clusterService() {
return clusterService;
}
public XPackLicenseState licenseState() {
return licenseState;
}
}
/** A factory for constructing {@link Exporter} instances.*/

View File

@ -9,12 +9,14 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter;
@ -35,15 +37,20 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
private final Map<String, Exporter.Factory> factories;
private final AtomicReference<Map<String, Exporter>> exporters;
private final ClusterService clusterService;
private final XPackLicenseState licenseState;
private final ThreadContext threadContext;
public Exporters(Settings settings, Map<String, Exporter.Factory> factories, ClusterService clusterService,
public Exporters(Settings settings, Map<String, Exporter.Factory> factories,
ClusterService clusterService, XPackLicenseState licenseState,
ThreadContext threadContext) {
super(settings);
this.factories = factories;
this.exporters = new AtomicReference<>(emptyMap());
this.threadContext = Objects.requireNonNull(threadContext);
this.clusterService = Objects.requireNonNull(clusterService);
this.licenseState = Objects.requireNonNull(licenseState);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MonitoringSettings.EXPORTERS_SETTINGS, this::setExportersSetting);
}
@ -92,6 +99,11 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
}
ExportBulk openBulk() {
if (clusterService.state().version() == ClusterState.UNKNOWN_VERSION) {
logger.trace("skipping exporters because the cluster state is not loaded");
return null;
}
List<ExportBulk> bulks = new ArrayList<>();
for (Exporter exporter : this) {
try {
@ -109,12 +121,12 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks, threadContext);
}
Map<String, Exporter> initExporters(Settings settings) {
Map<String, Exporter> initExporters(Settings exportersSettings) {
Set<String> singletons = new HashSet<>();
Map<String, Exporter> exporters = new HashMap<>();
boolean hasDisabled = false;
for (String name : settings.names()) {
Settings exporterSettings = settings.getAsSettings(name);
for (String name : exportersSettings.names()) {
Settings exporterSettings = exportersSettings.getAsSettings(name);
String type = exporterSettings.get("type");
if (type == null) {
throw new SettingsException("missing exporter type for [" + name + "] exporter");
@ -123,7 +135,7 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
if (factory == null) {
throw new SettingsException("unknown exporter type [" + type + "] set for exporter [" + name + "]");
}
Exporter.Config config = new Exporter.Config(name, type, exporterSettings);
Exporter.Config config = new Exporter.Config(name, type, settings, exporterSettings, clusterService, licenseState);
if (!config.enabled()) {
hasDisabled = true;
if (logger.isDebugEnabled()) {
@ -150,7 +162,9 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
// fallback on the default
//
if (exporters.isEmpty() && !hasDisabled) {
Exporter.Config config = new Exporter.Config("default_" + LocalExporter.TYPE, LocalExporter.TYPE, Settings.EMPTY);
Exporter.Config config =
new Exporter.Config("default_" + LocalExporter.TYPE, LocalExporter.TYPE, settings, Settings.EMPTY,
clusterService, licenseState);
exporters.put(config.name(), factories.get(LocalExporter.TYPE).create(config));
}

View File

@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.license.XPackLicenseState;
import java.util.Objects;
import java.util.function.Supplier;
/**
* {@code ClusterAlertHttpResource}s allow the checking, uploading, and deleting of Watches to a remote cluster based on the current license
* state.
*/
public class ClusterAlertHttpResource extends PublishableHttpResource {
private static final Logger logger = Loggers.getLogger(ClusterAlertHttpResource.class);
/**
* License State is used to determine if we should even be add or delete our watches.
*/
private final XPackLicenseState licenseState;
/**
* The name of the Watch that is sent to the remote cluster.
*/
private final Supplier<String> watchId;
/**
* Provides a fully formed Watch (e.g., no variables that need replaced).
*/
private final Supplier<String> watch;
/**
* Create a new {@link ClusterAlertHttpResource}.
*
* @param resourceOwnerName The user-recognizable name.
* @param watchId The name of the watch, which is lazily loaded.
* @param watch The watch provider.
*/
public ClusterAlertHttpResource(final String resourceOwnerName,
final XPackLicenseState licenseState,
final Supplier<String> watchId, final Supplier<String> watch) {
// Watcher does not support master_timeout
super(resourceOwnerName, null, PublishableHttpResource.NO_BODY_PARAMETERS);
this.licenseState = Objects.requireNonNull(licenseState);
this.watchId = Objects.requireNonNull(watchId);
this.watch = Objects.requireNonNull(watch);
}
/**
* Determine if the current {@linkplain #watchId Watch} exists.
*/
@Override
protected CheckResponse doCheck(final RestClient client) {
// if we should be adding, then we need to check for existence
if (licenseState.isMonitoringClusterAlertsAllowed()) {
return simpleCheckForResource(client, logger,
"/_xpack/watcher/watch", watchId.get(), "monitoring cluster alert",
resourceOwnerName, "monitoring cluster");
}
// if we should be deleting, then just try to delete it (same level of effort as checking)
final boolean deleted = deleteResource(client, logger, "/_xpack/watcher/watch", watchId.get(),
"monitoring cluster alert",
resourceOwnerName, "monitoring cluster");
return deleted ? CheckResponse.EXISTS : CheckResponse.ERROR;
}
/**
* Publish the missing {@linkplain #watchId Watch}.
*/
@Override
protected boolean doPublish(final RestClient client) {
return putResource(client, logger,
"/_xpack/watcher/watch", watchId.get(), this::watchToHttpEntity, "monitoring cluster alert",
resourceOwnerName, "monitoring cluster");
}
/**
* Create a {@link HttpEntity} for the {@link #watch}.
*
* @return Never {@code null}.
*/
HttpEntity watchToHttpEntity() {
return new StringEntity(watch.get(), ContentType.APPLICATION_JSON);
}
}

View File

@ -65,7 +65,7 @@ public class DataTypeMappingHttpResource extends PublishableHttpResource {
final Tuple<CheckResponse, Response> resource =
checkForResource(client, logger,
"/" + DATA_INDEX + "/_mapping", typeName, "monitoring mapping type",
resourceOwnerName, "monitoring cluster");
resourceOwnerName, "monitoring cluster", GET_EXISTS, GET_DOES_NOT_EXIST);
// depending on the content, we need to flip the actual response
CheckResponse checkResponse = resource.v1();

View File

@ -20,6 +20,7 @@ import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.sniff.ElasticsearchHostsSniffer;
import org.elasticsearch.client.sniff.ElasticsearchHostsSniffer.Scheme;
import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
@ -31,16 +32,16 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
import org.elasticsearch.xpack.ssl.SSLService;
import javax.net.ssl.SSLContext;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -49,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
@ -157,6 +159,11 @@ public class HttpExporter extends Exporter {
*/
private final HttpResource resource;
/**
* Track whether cluster alerts are allowed or not between requests. This allows us to avoid wiring a listener and to lazily change it.
*/
private final AtomicBoolean clusterAlertsAllowed = new AtomicBoolean(false);
private final ResolversRegistry resolvers;
private final ThreadContext threadContext;
@ -231,7 +238,7 @@ public class HttpExporter extends Exporter {
this.defaultParams = createDefaultParams(config);
this.threadContext = threadContext;
// mark resources as dirty after any node failure
// mark resources as dirty after any node failure or license change
listener.setResource(resource);
}
@ -317,9 +324,14 @@ public class HttpExporter extends Exporter {
configureTemplateResources(config, resolvers, resourceOwnerName, resources);
// load the pipeline (this will get added to as the monitoring API version increases)
configurePipelineResources(config, resourceOwnerName, resources);
// alias .marvel-es-1-* indices
resources.add(new BackwardsCompatibilityAliasesResource(resourceOwnerName,
config.settings().getAsTime(ALIAS_TIMEOUT_SETTING, timeValueSeconds(30))));
// load the watches for cluster alerts if Watcher is available
configureClusterAlertsResources(config, resourceOwnerName, resources);
return new MultiHttpResource(resourceOwnerName, resources);
}
@ -571,8 +583,46 @@ public class HttpExporter extends Exporter {
}
}
/**
* Adds the {@code resources} necessary for checking and publishing cluster alerts.
*
* @param config The HTTP Exporter's configuration
* @param resourceOwnerName The resource owner name to display for any logging messages.
* @param resources The resources to add too.
*/
private static void configureClusterAlertsResources(final Config config, final String resourceOwnerName,
final List<HttpResource> resources) {
final Settings settings = config.settings();
// don't create watches if we're not using them
if (settings.getAsBoolean(CLUSTER_ALERTS_MANAGEMENT_SETTING, true)) {
final ClusterService clusterService = config.clusterService();
final List<HttpResource> watchResources = new ArrayList<>();
// add a resource per watch
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
// lazily load the cluster state to fetch the cluster UUID once it's loaded
final Supplier<String> uniqueWatchId = () -> ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId);
final Supplier<String> watch = () -> ClusterAlertsUtil.loadWatch(clusterService, watchId);
watchResources.add(new ClusterAlertHttpResource(resourceOwnerName, config.licenseState(), uniqueWatchId, watch));
}
// wrap the watches in a conditional resource check to ensure the remote cluster has watcher available / enabled
resources.add(new WatcherExistsHttpResource(resourceOwnerName, clusterService,
new MultiHttpResource(resourceOwnerName, watchResources)));
}
}
@Override
public HttpExportBulk openBulk() {
final boolean canUseClusterAlerts = config.licenseState().isMonitoringClusterAlertsAllowed();
// if this changes between updates, then we need to add OR remove the watches
if (clusterAlertsAllowed.compareAndSet(!canUseClusterAlerts, canUseClusterAlerts)) {
resource.markDirty();
}
// block until all resources are verified to exist
if (resource.checkAndPublishIfDirty(client)) {
return new HttpExportBulk(settingFQN(config), client, defaultParams, resolvers, threadContext);

View File

@ -93,7 +93,7 @@ public abstract class HttpResource {
* Mark the resource as {@linkplain #isDirty() dirty}.
*/
public final void markDirty() {
state.compareAndSet(State.CLEAN, State.DIRTY);
state.set(State.DIRTY);
}
/**
@ -138,6 +138,9 @@ public abstract class HttpResource {
* This will perform the check regardless of the {@linkplain #isDirty() dirtiness} and it will update the dirtiness.
* Using this directly can be useful if there is ever a need to double-check dirtiness without having to {@linkplain #markDirty() mark}
* it as dirty.
* <p>
* If you do mark this as dirty while this is running (e.g., asynchronously something invalidates a resource), then the resource will
* still be dirty at the end, but the success of it will still return based on the checks it ran.
*
* @param client The REST client to make the request(s).
* @return {@code true} if the resource is available for use. {@code false} to stop.
@ -152,10 +155,7 @@ public abstract class HttpResource {
try {
success = doCheckAndPublish(client);
} finally {
// nothing else should be unsetting from CHECKING
assert state.get() == State.CHECKING;
state.set(success ? State.CLEAN : State.DIRTY);
state.compareAndSet(State.CHECKING, success ? State.CLEAN : State.DIRTY);
}
return success;

View File

@ -15,12 +15,15 @@ import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* {@code PublishableHttpResource} represents an {@link HttpResource} that is a single file or object that can be checked <em>and</em>
@ -38,6 +41,8 @@ public abstract class PublishableHttpResource extends HttpResource {
/**
* The check found the resource, so nothing needs to be published.
* <p>
* This can also be used to skip the publishing portion if desired.
*/
EXISTS,
/**
@ -61,6 +66,15 @@ public abstract class PublishableHttpResource extends HttpResource {
*/
public static final Map<String, String> NO_BODY_PARAMETERS = Collections.singletonMap("filter_path", FILTER_PATH_NONE);
/**
* The default set of acceptable exists response codes for GET requests.
*/
public static final Set<Integer> GET_EXISTS = Collections.singleton(RestStatus.OK.getStatus());
/**
* The default set of <em>acceptable</em> response codes for GET requests to represent that it does NOT exist.
*/
public static final Set<Integer> GET_DOES_NOT_EXIST = Collections.singleton(RestStatus.NOT_FOUND.getStatus());
/**
* The default parameters to use for any request.
*/
@ -161,7 +175,9 @@ public abstract class PublishableHttpResource extends HttpResource {
final String resourceBasePath,
final String resourceName, final String resourceType,
final String resourceOwnerName, final String resourceOwnerType) {
return checkForResource(client, logger, resourceBasePath, resourceName, resourceType, resourceOwnerName, resourceOwnerType).v1();
return checkForResource(client, logger, resourceBasePath, resourceName, resourceType, resourceOwnerName, resourceOwnerType,
GET_EXISTS, GET_DOES_NOT_EXIST)
.v1();
}
/**
@ -171,11 +187,13 @@ public abstract class PublishableHttpResource extends HttpResource {
*
* @param client The REST client to make the request(s).
* @param logger The logger to use for status messages.
* @param resourceBasePath The base path/endpoint to check for the resource (e.g., "/_template").
* @param resourceBasePath The base path/endpoint to check for the resource (e.g., "/_template"), if any.
* @param resourceName The name of the resource (e.g., "template123").
* @param resourceType The type of resource (e.g., "monitoring template").
* @param resourceOwnerName The user-recognizeable resource owner.
* @param resourceOwnerType The type of resource owner being dealt with (e.g., "monitoring cluster").
* @param exists Response codes that represent {@code EXISTS}.
* @param doesNotExist Response codes that represent {@code DOES_NOT_EXIST}.
* @return Never {@code null} pair containing the checked response and the returned response.
* The response will only ever be {@code null} if none was returned.
* @see #simpleCheckForResource(RestClient, Logger, String, String, String, String, String)
@ -183,18 +201,28 @@ public abstract class PublishableHttpResource extends HttpResource {
protected Tuple<CheckResponse, Response> checkForResource(final RestClient client, final Logger logger,
final String resourceBasePath,
final String resourceName, final String resourceType,
final String resourceOwnerName, final String resourceOwnerType) {
final String resourceOwnerName, final String resourceOwnerType,
final Set<Integer> exists, final Set<Integer> doesNotExist) {
logger.trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType);
try {
final Response response = client.performRequest("GET", resourceBasePath + "/" + resourceName, parameters);
final Set<Integer> expectedResponseCodes = Sets.union(exists, doesNotExist);
// avoid exists and DNE parameters from being an exception by default
final Map<String, String> getParameters = new HashMap<>(parameters);
getParameters.put("ignore", expectedResponseCodes.stream().map(i -> i.toString()).collect(Collectors.joining(",")));
// we don't currently check for the content because we always expect it to be the same;
// if we ever make a BWC change to any template (thus without renaming it), then we need to check the content!
if (response.getStatusLine().getStatusCode() == RestStatus.OK.getStatus()) {
try {
final Response response = client.performRequest("GET", resourceBasePath + "/" + resourceName, getParameters);
final int statusCode = response.getStatusLine().getStatusCode();
// checking the content is the job of whoever called this function by checking the tuple's response
if (exists.contains(statusCode)) {
logger.debug("{} [{}] found on the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType);
return new Tuple<>(CheckResponse.EXISTS, response);
} else if (doesNotExist.contains(statusCode)) {
logger.debug("{} [{}] does not exist on the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType);
return new Tuple<>(CheckResponse.DOES_NOT_EXIST, response);
} else {
throw new ResponseException(response);
}
@ -202,12 +230,6 @@ public abstract class PublishableHttpResource extends HttpResource {
final Response response = e.getResponse();
final int statusCode = response.getStatusLine().getStatusCode();
// 404
if (statusCode == RestStatus.NOT_FOUND.getStatus()) {
logger.debug("{} [{}] does not exist on the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType);
return new Tuple<>(CheckResponse.DOES_NOT_EXIST, response);
} else {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("failed to verify {} [{}] on the [{}] {} with status code [{}]",
resourceType, resourceName, resourceOwnerName, resourceOwnerType, statusCode),
@ -215,7 +237,6 @@ public abstract class PublishableHttpResource extends HttpResource {
// weirder failure than below; block responses just like other unexpected failures
return new Tuple<>(CheckResponse.ERROR, response);
}
} catch (IOException | RuntimeException e) {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("failed to verify {} [{}] on the [{}] {}",
@ -280,4 +301,53 @@ public abstract class PublishableHttpResource extends HttpResource {
return success;
}
/**
* Delete the {@code resourceName} using the {@code resourceBasePath} endpoint.
* <p>
* Note to callers: this will add an "ignore" parameter to the request so that 404 is not an exception and therefore considered
* successful if it's not found. You can override this behavior by specifying any valid value for "ignore", at which point 404
* responses will result in {@code false} and logged failure.
*
* @param client The REST client to make the request(s).
* @param logger The logger to use for status messages.
* @param resourceBasePath The base path/endpoint to check for the resource (e.g., "/_template").
* @param resourceName The name of the resource (e.g., "template123").
* @param resourceType The type of resource (e.g., "monitoring template").
* @param resourceOwnerName The user-recognizeable resource owner.
* @param resourceOwnerType The type of resource owner being dealt with (e.g., "monitoring cluster").
* @return {@code true} if it successfully deleted the item; otherwise {@code false}.
*/
protected boolean deleteResource(final RestClient client, final Logger logger,
final String resourceBasePath, final String resourceName,
final String resourceType,
final String resourceOwnerName, final String resourceOwnerType) {
logger.trace("deleting {} [{}] from the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType);
boolean success = false;
// avoid 404 being an exception by default
final Map<String, String> deleteParameters = new HashMap<>(parameters);
deleteParameters.putIfAbsent("ignore", Integer.toString(RestStatus.NOT_FOUND.getStatus()));
try {
final Response response = client.performRequest("DELETE", resourceBasePath + "/" + resourceName, deleteParameters);
final int statusCode = response.getStatusLine().getStatusCode();
// 200 or 404 (not found is just as good as deleting it!)
if (statusCode == RestStatus.OK.getStatus() || statusCode == RestStatus.NOT_FOUND.getStatus()) {
logger.debug("{} [{}] deleted from the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType);
success = true;
} else {
throw new RuntimeException("[" + resourceBasePath + "/" + resourceName + "] responded with [" + statusCode + "]");
}
} catch (IOException | RuntimeException e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete {} [{}] on the [{}] {}",
resourceType, resourceName, resourceOwnerName, resourceOwnerType),
e);
}
return success;
}
}

View File

@ -0,0 +1,166 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* {@code WatcherExistsHttpResource} checks for the availability of Watcher in the remote cluster and, if it is enabled, attempts to
* execute the {@link MultiHttpResource} assumed to contain Watches to add to the remote cluster.
*/
public class WatcherExistsHttpResource extends PublishableHttpResource {
private static final Logger logger = Loggers.getLogger(WatcherExistsHttpResource.class);
/**
* Use this to avoid getting any JSON response from a request.
*/
public static final Map<String, String> WATCHER_CHECK_PARAMETERS =
Collections.singletonMap("filter_path", "features.watcher.available,features.watcher.enabled");
/**
* Valid response codes that note explicitly that {@code _xpack} does not exist.
*/
public static final Set<Integer> XPACK_DOES_NOT_EXIST =
Sets.newHashSet(RestStatus.NOT_FOUND.getStatus(), RestStatus.BAD_REQUEST.getStatus());
/**
* The cluster service allows this check to be limited to only handling <em>elected</em> master nodes
*/
private final ClusterService clusterService;
/**
* Resources that are used if the check passes
*/
private final MultiHttpResource watches;
/**
* Create a {@link WatcherExistsHttpResource}.
*
* @param resourceOwnerName The user-recognizable name.
* @param watches The Watches to create if Watcher is available and enabled.
*/
public WatcherExistsHttpResource(final String resourceOwnerName, final ClusterService clusterService, final MultiHttpResource watches) {
// _xpack does not support master_timeout
super(resourceOwnerName, null, WATCHER_CHECK_PARAMETERS);
this.clusterService = Objects.requireNonNull(clusterService);
this.watches = Objects.requireNonNull(watches);
}
/**
* Get the Watch resources that are managed by this resource.
*
* @return Never {@code null}.
*/
public MultiHttpResource getWatches() {
return watches;
}
/**
* Determine if X-Pack is installed and, if so, if Watcher is both available <em>and</em> enabled so that it can be used.
* <p>
* If it is not both available and enabled, then we mark that it {@code EXISTS} so that no follow-on work is performed relative to
* Watcher. We do the same thing if the current node is not the elected master node.
*/
@Override
protected CheckResponse doCheck(final RestClient client) {
// only the master manages watches
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
return checkXPackForWatcher(client);
}
// not the elected master
return CheckResponse.EXISTS;
}
/**
* Reach out to the remote cluster to determine the usability of Watcher.
*
* @param client The REST client to make the request(s).
* @return Never {@code null}.
*/
private CheckResponse checkXPackForWatcher(final RestClient client) {
final Tuple<CheckResponse, Response> response =
checkForResource(client, logger,
"", "_xpack", "watcher check",
resourceOwnerName, "monitoring cluster",
GET_EXISTS,
Sets.newHashSet(RestStatus.NOT_FOUND.getStatus(), RestStatus.BAD_REQUEST.getStatus()));
final CheckResponse checkResponse = response.v1();
// if the response succeeds overall, then we have X-Pack, but we need to inspect to verify Watcher existence
if (checkResponse == CheckResponse.EXISTS) {
try {
if (canUseWatcher(response.v2(), XContentType.JSON.xContent())) {
return CheckResponse.DOES_NOT_EXIST;
}
} catch (final IOException | RuntimeException e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to parse [_xpack] on the [{}]", resourceOwnerName), e);
return CheckResponse.ERROR;
}
} else if (checkResponse == CheckResponse.ERROR) {
return CheckResponse.ERROR;
}
// we return _exists_ to SKIP the work of putting Watches because WATCHER does not exist, so follow-on work cannot succeed
return CheckResponse.EXISTS;
}
/**
* Determine if Watcher exists ({@code EXISTS}) or does not exist ({@code DOES_NOT_EXIST}).
*
* @param response The filtered response from the _xpack info API
* @param xContent The XContent parser to use
* @return {@code true} represents it can be used. {@code false} that it cannot be used.
* @throws IOException if any issue occurs while parsing the {@code xContent} {@code response}.
* @throws RuntimeException if the response format is changed.
*/
private boolean canUseWatcher(final Response response, final XContent xContent) throws IOException {
// no named content used; so EMPTY is fine
final Map<String, Object> xpackInfo = XContentHelper.convertToMap(xContent, response.getEntity().getContent(), false);
// if it's empty, then there's no features.watcher response because of filter_path usage
if (xpackInfo.isEmpty() == false) {
@SuppressWarnings("unchecked")
final Map<String, Object> features = (Map<String, Object>) xpackInfo.get("features");
@SuppressWarnings("unchecked")
final Map<String, Object> watcher = (Map<String, Object>) features.get("watcher");
// if Watcher is both available _and_ enabled, then we can use it; either being true is not sufficient
if (Boolean.TRUE == watcher.get("available") && Boolean.TRUE == watcher.get("enabled")) {
return true;
}
}
return false;
}
/**
* Add Watches to the remote cluster.
*/
@Override
protected boolean doPublish(final RestClient client) {
return watches.checkAndPublish(client);
}
}

View File

@ -29,17 +29,25 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.XPackClient;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
import org.elasticsearch.xpack.monitoring.exporter.ExportBulk;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
@ -47,6 +55,12 @@ import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.transport.actions.delete.DeleteWatchRequest;
import org.elasticsearch.xpack.watcher.transport.actions.get.GetWatchRequest;
import org.elasticsearch.xpack.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchRequest;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -74,18 +88,20 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
private final InternalClient client;
private final ClusterService clusterService;
private final XPackLicenseState licenseState;
private final ResolversRegistry resolvers;
private final CleanerService cleanerService;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final AtomicBoolean installingSomething = new AtomicBoolean(false);
private final AtomicBoolean waitedForSetup = new AtomicBoolean(false);
private final AtomicBoolean watcherSetup = new AtomicBoolean(false);
public LocalExporter(Exporter.Config config, InternalClient client,
ClusterService clusterService, CleanerService cleanerService) {
public LocalExporter(Exporter.Config config, InternalClient client, CleanerService cleanerService) {
super(config);
this.client = client;
this.clusterService = clusterService;
this.clusterService = config.clusterService();
this.licenseState = config.licenseState();
this.cleanerService = cleanerService;
this.resolvers = new ResolversRegistry(config.settings());
@ -98,6 +114,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
clusterService.addListener(this);
cleanerService.add(this);
licenseState.addListener(this::licenseChanged);
}
ResolversRegistry getResolvers() {
@ -107,16 +124,23 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (state.get() == State.INITIALIZED) {
resolveBulk(event.state());
resolveBulk(event.state(), true);
}
}
/**
* When the license changes, we need to ensure that Watcher is setup properly.
*/
private void licenseChanged() {
watcherSetup.set(false);
}
@Override
public ExportBulk openBulk() {
if (state.get() != State.RUNNING) {
return null;
}
return resolveBulk(clusterService.state());
return resolveBulk(clusterService.state(), false);
}
@Override
@ -126,10 +150,11 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
// we also remove the listener in resolveBulk after we get to RUNNING, but it's okay to double-remove
clusterService.removeListener(this);
cleanerService.remove(this);
licenseState.removeListener(this::licenseChanged);
}
}
LocalBulk resolveBulk(ClusterState clusterState) {
LocalBulk resolveBulk(ClusterState clusterState, boolean clusterStateChange) {
if (clusterService.localNode() == null || clusterState == null) {
return null;
}
@ -157,7 +182,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
// elected master node needs to setup templates; non-master nodes need to wait for it to be setup
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
setup = setupIfElectedMaster(clusterState, templates);
setup = setupIfElectedMaster(clusterState, templates, clusterStateChange);
} else if (setupIfNotElectedMaster(clusterState, templates.keySet()) == false) {
// the first pass will be false so that we don't bother users if the master took one-go to setup
if (waitedForSetup.getAndSet(true)) {
@ -233,9 +258,11 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
*
* @param clusterState The current cluster state.
* @param templates All template names that should exist.
* @param clusterStateChange {@code true} if a cluster state change caused this call (don't block it!)
* @return {@code true} indicates that all resources are "ready" and the exporter can be used. {@code false} to stop and wait.
*/
private boolean setupIfElectedMaster(final ClusterState clusterState, final Map<String, String> templates) {
private boolean setupIfElectedMaster(final ClusterState clusterState, final Map<String, String> templates,
final boolean clusterStateChange) {
// we are on the elected master
// Check that there is nothing that could block metadata updates
if (clusterState.blocks().hasGlobalBlock(ClusterBlockLevel.METADATA_WRITE)) {
@ -301,7 +328,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
new ActionListener<IndicesAliasesResponse>() {
@Override
public void onResponse(IndicesAliasesResponse response) {
responseReceived();
responseReceived(pendingResponses, true, null);
if (response.isAcknowledged()) {
logger.info("Added modern aliases to 2.x monitoring indices {}", monitoringIndices2x);
} else {
@ -312,23 +339,28 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
@Override
public void onFailure(Exception e) {
responseReceived();
responseReceived(pendingResponses, false, null);
logger.error((Supplier<?>)
() -> new ParameterizedMessage("Unable to add modern aliases to 2.x monitoring indices {}",
monitoringIndices2x), e);
}
private void responseReceived() {
if (pendingResponses.decrementAndGet() <= 0) {
logger.trace("all installation requests returned a response");
if (installingSomething.compareAndSet(true, false) == false) {
throw new IllegalStateException("could not reset installing flag to false");
}
}
}
}));
}
// avoid constantly trying to setup Watcher, which requires a lot of overhead and avoid attempting to setup during a cluster state
// change
if (state.get() == State.RUNNING && clusterStateChange == false && canUseWatcher()) {
final IndexRoutingTable watches = clusterState.routingTable().index(Watch.INDEX);
final boolean indexExists = watches != null && watches.allPrimaryShardsActive();
// we cannot do anything with watches until the index is allocated, so we wait until it's ready
if (watches != null && watches.allPrimaryShardsActive() == false) {
logger.trace("cannot manage cluster alerts because [.watches] index is not allocated");
} else if ((watches == null || indexExists) && watcherSetup.compareAndSet(false, true)) {
installClusterAlerts(indexExists, asyncActions, pendingResponses);
}
}
if (asyncActions.size() > 0) {
if (installingSomething.compareAndSet(false, true)) {
pendingResponses.set(asyncActions.size());
@ -345,6 +377,19 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
return true;
}
private void responseReceived(final AtomicInteger pendingResponses, final boolean success, final @Nullable AtomicBoolean setup) {
if (setup != null && success == false) {
setup.set(false);
}
if (pendingResponses.decrementAndGet() <= 0) {
logger.trace("all installation requests returned a response");
if (installingSomething.compareAndSet(true, false) == false) {
throw new IllegalStateException("could not reset installing flag to false");
}
}
}
/**
* Determine if the mapping {@code type} exists in the {@linkplain MonitoringTemplateUtils#DATA_INDEX data index}.
*
@ -455,6 +500,59 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
client.admin().indices().putTemplate(request, listener);
}
/**
* Install Cluster Alerts (Watches) into the cluster
*
* @param asyncActions Asynchronous actions are added to for each Watch.
* @param pendingResponses Pending response countdown we use to track completion.
*/
private void installClusterAlerts(final boolean indexExists, final List<Runnable> asyncActions, final AtomicInteger pendingResponses) {
final XPackClient xpackClient = new XPackClient(client);
final WatcherClient watcher = xpackClient.watcher();
final boolean canAddWatches = licenseState.isMonitoringClusterAlertsAllowed();
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId);
// we aren't sure if no watches exist yet, so add them
if (indexExists) {
if (canAddWatches) {
logger.trace("checking monitoring watch [{}]", uniqueWatchId);
asyncActions.add(() -> watcher.getWatch(new GetWatchRequest(uniqueWatchId),
new GetAndPutWatchResponseActionListener(watcher, watchId, uniqueWatchId,
pendingResponses)));
} else {
logger.trace("pruning monitoring watch [{}]", uniqueWatchId);
asyncActions.add(() -> watcher.deleteWatch(new DeleteWatchRequest(uniqueWatchId),
new ResponseActionListener<>("watch", uniqueWatchId, pendingResponses)));
}
} else if (canAddWatches) {
asyncActions.add(() -> putWatch(watcher, watchId, uniqueWatchId, pendingResponses));
}
}
}
private void putWatch(final WatcherClient watcher, final String watchId, final String uniqueWatchId,
final AtomicInteger pendingResponses) {
final String watch = ClusterAlertsUtil.loadWatch(clusterService, watchId);
logger.trace("adding monitoring watch [{}]", uniqueWatchId);
watcher.putWatch(new PutWatchRequest(uniqueWatchId, new BytesArray(watch), XContentType.JSON),
new ResponseActionListener<>("watch", uniqueWatchId, pendingResponses, watcherSetup));
}
/**
* Determine if the cluster can use Watcher.
*
* @return {@code true} to use Cluster Alerts.
*/
private boolean canUseWatcher() {
return XPackSettings.WATCHER_ENABLED.get(config.globalSettings());
}
@Override
public void onCleanUpIndices(TimeValue retention) {
if (state.get() != State.RUNNING) {
@ -566,41 +664,82 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
/**
* Acknowledge success / failure for any given creation attempt (e.g., template or pipeline).
*/
private class ResponseActionListener<Response extends AcknowledgedResponse> implements ActionListener<Response> {
private class ResponseActionListener<Response> implements ActionListener<Response> {
private final String type;
private final String name;
private final AtomicInteger countDown;
private final AtomicBoolean setup;
private ResponseActionListener(String type, String name, AtomicInteger countDown) {
this(type, name, countDown, null);
}
private ResponseActionListener(String type, String name, AtomicInteger countDown, @Nullable AtomicBoolean setup) {
this.type = Objects.requireNonNull(type);
this.name = Objects.requireNonNull(name);
this.countDown = Objects.requireNonNull(countDown);
this.setup = setup;
}
@Override
public void onResponse(Response response) {
responseReceived();
if (response.isAcknowledged()) {
responseReceived(countDown, true, setup);
if (response instanceof AcknowledgedResponse) {
if (((AcknowledgedResponse)response).isAcknowledged()) {
logger.trace("successfully set monitoring {} [{}]", type, name);
} else {
logger.error("failed to set monitoring index {} [{}]", type, name);
logger.error("failed to set monitoring {} [{}]", type, name);
}
} else {
logger.trace("successfully handled monitoring {} [{}]", type, name);
}
}
@Override
public void onFailure(Exception e) {
responseReceived();
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to set monitoring index {} [{}]", type, name), e);
responseReceived(countDown, false, setup);
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to set monitoring {} [{}]", type, name), e);
}
}
private void responseReceived() {
if (countDown.decrementAndGet() <= 0) {
logger.trace("all installation requests returned a response");
if (installingSomething.compareAndSet(true, false) == false) {
throw new IllegalStateException("could not reset installing flag to false");
private class GetAndPutWatchResponseActionListener implements ActionListener<GetWatchResponse> {
private final WatcherClient watcher;
private final String watchId;
private final String uniqueWatchId;
private final AtomicInteger countDown;
private GetAndPutWatchResponseActionListener(final WatcherClient watcher,
final String watchId, final String uniqueWatchId,
final AtomicInteger countDown) {
this.watcher = Objects.requireNonNull(watcher);
this.watchId = Objects.requireNonNull(watchId);
this.uniqueWatchId = Objects.requireNonNull(uniqueWatchId);
this.countDown = Objects.requireNonNull(countDown);
}
@Override
public void onResponse(GetWatchResponse response) {
if (response.isFound()) {
logger.trace("found monitoring watch [{}]", uniqueWatchId);
responseReceived(countDown, true, watcherSetup);
} else {
putWatch(watcher, watchId, uniqueWatchId, countDown);
}
}
@Override
public void onFailure(Exception e) {
responseReceived(countDown, false, watcherSetup);
if ((e instanceof IndexNotFoundException) == false) {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("failed to get monitoring watch [{}]", uniqueWatchId), e);
}
}
}
}

View File

@ -47,7 +47,12 @@ public class ReservedRolesStore {
RoleDescriptor.IndicesPrivileges.builder().indices(".marvel-es-*", ".monitoring-*").privileges("read").build() },
null, MetadataUtils.DEFAULT_RESERVED_METADATA))
.put("remote_monitoring_agent", new RoleDescriptor("remote_monitoring_agent",
new String[] { "manage_index_templates", "manage_ingest_pipelines", "monitor" },
new String[] {
"manage_index_templates", "manage_ingest_pipelines", "monitor",
"cluster:admin/xpack/watcher/watch/get",
"cluster:admin/xpack/watcher/watch/put",
"cluster:admin/xpack/watcher/watch/delete",
},
new RoleDescriptor.IndicesPrivileges[] {
RoleDescriptor.IndicesPrivileges.builder().indices(".marvel-es-*", ".monitoring-*").privileges("all").build() },
null, MetadataUtils.DEFAULT_RESERVED_METADATA))

View File

@ -0,0 +1,110 @@
{
"metadata": {
"name": "X-Pack Monitoring: Cluster Status (${monitoring.watch.cluster_uuid})",
"xpack": {
"alert_index": ".monitoring-alerts-2",
"cluster_uuid": "${monitoring.watch.cluster_uuid}",
"link": "elasticsearch/indices",
"severity": 2100,
"type": "monitoring",
"version": "5.4.0",
"watch": "${monitoring.watch.id}"
}
},
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"chain": {
"inputs": [
{
"check": {
"search": {
"request": {
"indices": [
".monitoring-es-2-*"
],
"body": {
"size": 1,
"sort": [
{
"timestamp": {
"order": "desc"
}
}
],
"_source": [
"cluster_state.status"
],
"query": {
"bool": {
"filter": [
{
"term": {
"cluster_uuid": "${monitoring.watch.cluster_uuid}"
}
},
{
"term": {
"_type": "cluster_state"
}
}
]
}
}
}
}
}
}
},
{
"alert": {
"search": {
"request": {
"indices": [
".monitoring-alerts-2"
],
"body": {
"size": 1,
"terminate_after": 1,
"query": {
"bool": {
"filter": {
"term": {
"_id": "${monitoring.watch.unique_id}"
}
}
}
}
}
}
}
}
}
]
}
},
"condition": {
"script": {
"inline":
"ctx.vars.fails_check = ctx.payload.check.hits.total != 0 && ctx.payload.check.hits.hits[0]._source.cluster_state.status != 'green';ctx.vars.not_resolved = ctx.payload.alert.hits.total == 1 && ctx.payload.alert.hits.hits[0]._source.resolved_timestamp == null;return ctx.vars.fails_check || ctx.vars.not_resolved"
}
},
"transform": {
"script": {
"inline":
"def state = 'red';if (ctx.vars.fails_check){state = ctx.payload.check.hits.hits[0]._source.cluster_state.status;}if (ctx.vars.not_resolved){ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check == false) {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = ['timestamp': ctx.execution_time, 'metadata': ctx.metadata.xpack];}if (ctx.vars.fails_check) {ctx.payload.prefix = 'Elasticsearch cluster status is ' + state + '.';if (state == 'red') {ctx.payload.message = 'Allocate missing primary shards and replica shards.';ctx.payload.metadata.severity = 2100;} else {ctx.payload.message = 'Allocate missing replica shards.';ctx.payload.metadata.severity = 1100;}}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
}
},
"actions": {
"trigger_alert": {
"index": {
"index": ".monitoring-alerts-2",
"doc_type": "doc",
"doc_id": "${monitoring.watch.unique_id}"
}
}
}
}

View File

@ -0,0 +1,103 @@
{
"metadata": {
"name": "X-Pack Monitoring: Elasticsearch Version Mismatch (${monitoring.watch.cluster_uuid})",
"xpack": {
"alert_index": ".monitoring-alerts-2",
"cluster_uuid": "${monitoring.watch.cluster_uuid}",
"link": "elasticsearch/nodes",
"severity": 1000,
"type": "monitoring",
"watch": "${monitoring.watch.id}"
}
},
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"chain": {
"inputs": [
{
"check": {
"search": {
"request": {
"indices": [
".monitoring-es-2-*"
],
"body": {
"size": 1,
"_source": [
"cluster_stats.nodes.versions"
],
"query": {
"bool": {
"filter": [
{
"term": {
"_id": "${monitoring.watch.cluster_uuid}"
}
},
{
"term": {
"_type": "cluster_stats"
}
}
]
}
},
"sort": [
"timestamp"
]
}
}
}
}
},
{
"alert": {
"search": {
"request": {
"indices": [
".monitoring-alerts-2"
],
"body": {
"size": 1,
"terminate_after": 1,
"query": {
"bool": {
"filter": {
"term": {
"_id": "${monitoring.watch.unique_id}"
}
}
}
}
}
}
}
}
}
]
}
},
"condition": {
"script": {
"inline": "ctx.vars.fails_check = ctx.payload.check.hits.total != 0 && ctx.payload.check.hits.hits[0]._source.cluster_stats.nodes.versions.size() != 1;ctx.vars.not_resolved = ctx.payload.alert.hits.total == 1 && ctx.payload.alert.hits.hits[0]._source.resolved_timestamp == null;return ctx.vars.fails_check || ctx.vars.not_resolved;"
}
},
"transform": {
"script": {
"inline": "def versionMessage = null;if (ctx.vars.fails_check) {def versions = new ArrayList(ctx.payload.check.hits.hits[0]._source.cluster_stats.nodes.versions);Collections.sort(versions);versionMessage = 'Versions: [' + String.join(', ', versions) + '].';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check) {ctx.payload.message = versionMessage;} else {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = [ 'timestamp': ctx.execution_time, 'prefix': 'This cluster is running with multiple versions of Elasticsearch.', 'message': versionMessage, 'metadata': ctx.metadata.xpack ];}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
}
},
"actions": {
"trigger_alert": {
"index": {
"index": ".monitoring-alerts-2",
"doc_type": "doc",
"doc_id": "${monitoring.watch.unique_id}"
}
}
}
}

View File

@ -0,0 +1,130 @@
{
"metadata": {
"name": "X-Pack Monitoring: Kibana Version Mismatch (${monitoring.watch.cluster_uuid})",
"xpack": {
"alert_index": ".monitoring-alerts-2",
"cluster_uuid": "${monitoring.watch.cluster_uuid}",
"link": "kibana/instances",
"severity": 1000,
"type": "monitoring",
"watch": "${monitoring.watch.id}"
}
},
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"chain": {
"inputs": [
{
"check": {
"search": {
"request": {
"indices": [
".monitoring-kibana-2-*"
],
"body": {
"size": 0,
"query": {
"bool": {
"filter": [
{
"term": {
"cluster_uuid": "${monitoring.watch.cluster_uuid}"
}
},
{
"range": {
"timestamp": {
"gte": "now-2m"
}
}
},
{
"term": {
"_type": "kibana_stats"
}
}
]
}
},
"aggs": {
"group_by_kibana": {
"terms": {
"field": "kibana_stats.kibana.uuid",
"size": 1000
},
"aggs": {
"group_by_version": {
"terms": {
"field": "kibana_stats.kibana.version",
"size": 1,
"order": {
"latest_report": "desc"
}
},
"aggs": {
"latest_report": {
"max": {
"field": "timestamp"
}
}
}
}
}
}
}
}
}
}
}
},
{
"alert": {
"search": {
"request": {
"indices": [
".monitoring-alerts-2"
],
"body": {
"size": 1,
"terminate_after": 1,
"query": {
"bool": {
"filter": {
"term": {
"_id": "${monitoring.watch.unique_id}"
}
}
}
}
}
}
}
}
}
]
}
},
"condition": {
"script": {
"inline": "ctx.vars.fails_check = false;if (ctx.payload.check.hits.total != 0 && ctx.payload.check.aggregations.group_by_kibana.buckets.size() > 1) {def versions = new HashSet();for (def kibana : ctx.payload.check.aggregations.group_by_kibana.buckets) {if (kibana.group_by_version.buckets.size() != 0) {versions.add(kibana.group_by_version.buckets[0].key);}}if (versions.size() > 1) {ctx.vars.fails_check = true;ctx.vars.versions = new ArrayList(versions);Collections.sort(ctx.vars.versions);}}ctx.vars.not_resolved = ctx.payload.alert.hits.total == 1 && ctx.payload.alert.hits.hits[0]._source.resolved_timestamp == null;return ctx.vars.fails_check || ctx.vars.not_resolved;"
}
},
"transform": {
"script": {
"inline": "def versionMessage = null;if (ctx.vars.fails_check) {versionMessage = 'Versions: [' + String.join(', ', ctx.vars.versions) + '].';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check) {ctx.payload.message = versionMessage;} else {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = [ 'timestamp': ctx.execution_time, 'prefix': 'This cluster is running with multiple versions of Kibana.', 'message': versionMessage, 'metadata': ctx.metadata.xpack ];}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
}
},
"actions": {
"trigger_alert": {
"index": {
"index": ".monitoring-alerts-2",
"doc_type": "doc",
"doc_id": "${monitoring.watch.unique_id}"
}
}
}
}

View File

@ -0,0 +1,130 @@
{
"metadata": {
"name": "X-Pack Monitoring: Logstash Version Mismatch (${monitoring.watch.cluster_uuid})",
"xpack": {
"alert_index": ".monitoring-alerts-2",
"cluster_uuid": "${monitoring.watch.cluster_uuid}",
"link": "logstash/instances",
"severity": 1000,
"type": "monitoring",
"watch": "${monitoring.watch.id}"
}
},
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"chain": {
"inputs": [
{
"check": {
"search": {
"request": {
"indices": [
".monitoring-logstash-2-*"
],
"body": {
"size": 0,
"query": {
"bool": {
"filter": [
{
"term": {
"cluster_uuid": "${monitoring.watch.cluster_uuid}"
}
},
{
"range": {
"timestamp": {
"gte": "now-2m"
}
}
},
{
"term": {
"_type": "logstash_stats"
}
}
]
}
},
"aggs": {
"group_by_logstash": {
"terms": {
"field": "logstash_stats.logstash.uuid",
"size": 1000
},
"aggs": {
"group_by_version": {
"terms": {
"field": "logstash_stats.logstash.version",
"size": 1,
"order": {
"latest_report": "desc"
}
},
"aggs": {
"latest_report": {
"max": {
"field": "timestamp"
}
}
}
}
}
}
}
}
}
}
}
},
{
"alert": {
"search": {
"request": {
"indices": [
".monitoring-alerts-2"
],
"body": {
"size": 1,
"terminate_after": 1,
"query": {
"bool": {
"filter": {
"term": {
"_id": "${monitoring.watch.unique_id}"
}
}
}
}
}
}
}
}
}
]
}
},
"condition": {
"script": {
"inline": "ctx.vars.fails_check = false;if (ctx.payload.check.hits.total != 0 && ctx.payload.check.aggregations.group_by_logstash.buckets.size() > 1) {def versions = new HashSet();for (def logstash : ctx.payload.check.aggregations.group_by_logstash.buckets) {if (logstash.group_by_version.buckets.size() != 0) {versions.add(logstash.group_by_version.buckets[0].key);}}if (versions.size() > 1) {ctx.vars.fails_check = true;ctx.vars.versions = new ArrayList(versions);Collections.sort(ctx.vars.versions);}}ctx.vars.not_resolved = ctx.payload.alert.hits.total == 1 && ctx.payload.alert.hits.hits[0]._source.resolved_timestamp == null;return ctx.vars.fails_check || ctx.vars.not_resolved;"
}
},
"transform": {
"script": {
"inline": "def versionMessage = null;if (ctx.vars.fails_check) {versionMessage = 'Versions: [' + String.join(', ', ctx.vars.versions) + '].';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check) {ctx.payload.message = versionMessage;} else {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = [ 'timestamp': ctx.execution_time, 'prefix': 'This cluster is running with multiple versions of Logstash.', 'message': versionMessage, 'metadata': ctx.metadata.xpack ];}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
}
},
"actions": {
"trigger_alert": {
"index": {
"index": ".monitoring-alerts-2",
"doc_type": "doc",
"doc_id": "${monitoring.watch.unique_id}"
}
}
}
}

View File

@ -10,9 +10,9 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.xpack.ml.action.CloseJobAction.Response;
import org.elasticsearch.xpack.monitoring.exporter.ExportException;
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
@ -35,6 +35,7 @@ public class MonitoringServiceTests extends ESTestCase {
TestThreadPool threadPool;
MonitoringService monitoringService;
XPackLicenseState licenseState = mock(XPackLicenseState.class);
ClusterService clusterService;
ClusterSettings clusterSettings;
@ -104,20 +105,15 @@ public class MonitoringServiceTests extends ESTestCase {
Settings settings = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), MonitoringSettings.MIN_INTERVAL).build();
monitoringService = new MonitoringService(settings, clusterSettings, threadPool, emptySet(), exporter);
logger.debug("start the monitoring service");
monitoringService.start();
assertBusy(() -> assertTrue(monitoringService.isStarted()));
logger.debug("wait for the monitoring execution to be started");
assertBusy(() -> assertThat(exporter.getExportsCount(), equalTo(1)));
logger.debug("cancel current execution to avoid further execution once the latch is unblocked");
monitoringService.cancelExecution();
logger.debug("unblock the exporter");
latch.countDown();
logger.debug("verify that it hasn't been called more than one");
assertThat(exporter.getExportsCount(), equalTo(1));
}
@ -126,7 +122,7 @@ public class MonitoringServiceTests extends ESTestCase {
private final AtomicInteger exports = new AtomicInteger(0);
CountingExporter() {
super(Settings.EMPTY, Collections.emptyMap(), clusterService, threadPool.getThreadContext());
super(Settings.EMPTY, Collections.emptyMap(), clusterService, licenseState, threadPool.getThreadContext());
}
@Override

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
@ -64,6 +65,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Mockito.mock;
public class TransportMonitoringBulkActionTests extends ESTestCase {
@ -73,6 +75,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
public ExpectedException expectedException = ExpectedException.none();
private ClusterService clusterService;
private final XPackLicenseState licenseState = mock(XPackLicenseState.class);
private TransportService transportService;
private CapturingExporters exportService;
private TransportMonitoringBulkAction action;
@ -271,7 +274,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
private final Collection<MonitoringDoc> exported = ConcurrentCollections.newConcurrentSet();
CapturingExporters() {
super(Settings.EMPTY, Collections.emptyMap(), clusterService, threadPool.getThreadContext());
super(Settings.EMPTY, Collections.emptyMap(), clusterService, licenseState, threadPool.getThreadContext());
}
@Override
@ -293,7 +296,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
private final Consumer<Collection<? extends MonitoringDoc>> consumer;
ConsumingExporters(Consumer<Collection<? extends MonitoringDoc>> consumer) {
super(Settings.EMPTY, Collections.emptyMap(), clusterService, threadPool.getThreadContext());
super(Settings.EMPTY, Collections.emptyMap(), clusterService, licenseState, threadPool.getThreadContext());
this.consumer = consumer;
}

View File

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import org.junit.Before;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests {@link ClusterAlertsUtil}.
*/
public class ClusterAlertsUtilTests extends ESTestCase {
private final ClusterService clusterService = mock(ClusterService.class);
private final ClusterState clusterState = mock(ClusterState.class);
private final MetaData metaData = mock(MetaData.class);
private final String clusterUuid = randomAlphaOfLength(16);
@Before
public void setup() {
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.metaData()).thenReturn(metaData);
when(metaData.clusterUUID()).thenReturn(clusterUuid);
}
public void testWatchIdsAreAllUnique() {
final List<String> watchIds = Arrays.asList(ClusterAlertsUtil.WATCH_IDS);
assertThat(watchIds, hasSize(new HashSet<>(watchIds).size()));
}
public void testCreateUniqueWatchId() {
final String watchId = randomFrom(ClusterAlertsUtil.WATCH_IDS);
final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId);
assertThat(uniqueWatchId, equalTo(clusterUuid + "_" + watchId));
}
public void testLoadWatch() throws IOException {
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
final String watch = ClusterAlertsUtil.loadWatch(clusterService, watchId);
assertThat(watch, notNullValue());
assertThat(watch, containsString(clusterUuid));
assertThat(watch, containsString(watchId));
assertThat(watch, containsString(clusterUuid + "_" + watchId));
// validate that it's well formed JSON
assertThat(XContentHelper.convertToMap(XContentType.JSON.xContent(), watch, false), notNullValue());
}
}
public void testLoadWatchFails() {
expectThrows(RuntimeException.class, () -> ClusterAlertsUtil.loadWatch(clusterService, "watch-does-not-exist"));
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.monitoring.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -15,6 +16,7 @@ import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
@ -46,6 +48,7 @@ import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -53,6 +56,8 @@ public class ExportersTests extends ESTestCase {
private Exporters exporters;
private Map<String, Exporter.Factory> factories;
private ClusterService clusterService;
private ClusterState state;
private final XPackLicenseState licenseState = mock(XPackLicenseState.class);
private ClusterSettings clusterSettings;
private ThreadContext threadContext;
@ -67,14 +72,17 @@ public class ExportersTests extends ESTestCase {
when(threadPool.getThreadContext()).thenReturn(threadContext);
InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client);
clusterService = mock(ClusterService.class);
// default state.version() will be 0, which is "valid"
state = mock(ClusterState.class);
clusterSettings = new ClusterSettings(Settings.EMPTY,
new HashSet<>(Arrays.asList(MonitoringSettings.INTERVAL, MonitoringSettings.EXPORTERS_SETTINGS)));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
when(clusterService.state()).thenReturn(state);
// we always need to have the local exporter as it serves as the default one
factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, internalClient, clusterService, mock(CleanerService.class)));
factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, internalClient, mock(CleanerService.class)));
exporters = new Exporters(Settings.EMPTY, factories, clusterService, threadContext);
exporters = new Exporters(Settings.EMPTY, factories, clusterService, licenseState, threadContext);
}
public void testInitExportersDefault() throws Exception {
@ -175,7 +183,7 @@ public class ExportersTests extends ESTestCase {
clusterSettings = new ClusterSettings(nodeSettings, new HashSet<>(Arrays.asList(MonitoringSettings.EXPORTERS_SETTINGS)));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
exporters = new Exporters(nodeSettings, factories, clusterService, threadContext) {
exporters = new Exporters(nodeSettings, factories, clusterService, licenseState, threadContext) {
@Override
Map<String, Exporter> initExporters(Settings settings) {
settingsHolder.set(settings);
@ -212,6 +220,21 @@ public class ExportersTests extends ESTestCase {
assertThat(json, containsString("\"processors\":[]"));
}
public void testExporterBlocksOnClusterState() {
when(state.version()).thenReturn(ClusterState.UNKNOWN_VERSION);
final int nbExporters = randomIntBetween(1, 5);
final Settings.Builder settings = Settings.builder();
for (int i = 0; i < nbExporters; i++) {
settings.put("xpack.monitoring.exporters._name" + String.valueOf(i) + ".type", "record");
}
final Exporters exporters = new Exporters(settings.build(), factories, clusterService, licenseState, threadContext);
assertThat(exporters.openBulk(), nullValue());
}
/**
* This test creates N threads that export a random number of document
* using a {@link Exporters} instance.
@ -219,7 +242,6 @@ public class ExportersTests extends ESTestCase {
public void testConcurrentExports() throws Exception {
final int nbExporters = randomIntBetween(1, 5);
logger.info("--> creating {} exporters", nbExporters);
Settings.Builder settings = Settings.builder();
for (int i = 0; i < nbExporters; i++) {
settings.put("xpack.monitoring.exporters._name" + String.valueOf(i) + ".type", "record");
@ -227,7 +249,7 @@ public class ExportersTests extends ESTestCase {
factories.put("record", (s) -> new CountingExporter(s, threadContext));
Exporters exporters = new Exporters(settings.build(), factories, clusterService, threadContext);
Exporters exporters = new Exporters(settings.build(), factories, clusterService, licenseState, threadContext);
exporters.start();
final Thread[] threads = new Thread[3 + randomInt(7)];
@ -236,7 +258,6 @@ public class ExportersTests extends ESTestCase {
int total = 0;
logger.info("--> exporting documents using {} threads", threads.length);
for (int i = 0; i < threads.length; i++) {
int nbDocs = randomIntBetween(10, 50);
total += nbDocs;
@ -244,11 +265,9 @@ public class ExportersTests extends ESTestCase {
final int threadNum = i;
final int threadDocs = nbDocs;
logger.debug("--> exporting thread [{}] exports {} documents", threadNum, threadDocs);
threads[i] = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.error("unexpected error in exporting thread", e);
exceptions.add(e);
}
@ -270,7 +289,6 @@ public class ExportersTests extends ESTestCase {
threads[i].start();
}
logger.info("--> waiting for threads to exports {} documents", total);
for (Thread thread : threads) {
thread.join();
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import java.util.HashMap;
import org.apache.http.HttpEntity;
import org.apache.http.RequestLine;
import org.apache.http.StatusLine;
@ -13,14 +14,19 @@ import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.CheckResponse;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.GET_DOES_NOT_EXIST;
import static org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.GET_EXISTS;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@ -40,7 +46,7 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
/**
* Perform {@link PublishableHttpResource#doCheck(RestClient) doCheck} against the {@code resource} and assert that it returns
* {@code true} given a {@link RestStatus} that is {@link RestStatus#OK}.
* {@code EXISTS} given a {@link RestStatus} that is {@link RestStatus#OK}.
*
* @param resource The resource to execute.
* @param resourceBasePath The base endpoint (e.g., "/_template")
@ -53,7 +59,7 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
/**
* Perform {@link PublishableHttpResource#doCheck(RestClient) doCheck} against the {@code resource} and assert that it returns
* {@code false} given a {@link RestStatus} that is not {@link RestStatus#OK}.
* {@code DOES_NOT_EXIST} given a {@link RestStatus} that is not {@link RestStatus#OK}.
*
* @param resource The resource to execute.
* @param resourceBasePath The base endpoint (e.g., "/_template")
@ -66,7 +72,7 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
/**
* Perform {@link PublishableHttpResource#doCheck(RestClient) doCheck} against the {@code resource} that throws an exception and assert
* that it returns {@code false}.
* that it returns {@code ERROR}.
*
* @param resource The resource to execute.
* @param resourceBasePath The base endpoint (e.g., "/_template")
@ -79,7 +85,43 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
final ResponseException responseException = responseException("GET", endpoint, failedCheckStatus());
final Exception e = randomFrom(new IOException("expected"), new RuntimeException("expected"), responseException);
when(client.performRequest("GET", endpoint, resource.getParameters())).thenThrow(e);
when(client.performRequest("GET", endpoint, getParameters(resource.getParameters()))).thenThrow(e);
assertThat(resource.doCheck(client), is(CheckResponse.ERROR));
}
/**
* Perform {@link PublishableHttpResource#doCheck(RestClient) doCheck} against the {@code resource}, expecting a {@code DELETE}, and
* assert that it returns {@code EXISTS} given a {@link RestStatus} that is {@link RestStatus#OK} or {@link RestStatus#NOT_FOUND}.
*
* @param resource The resource to execute.
* @param resourceBasePath The base endpoint (e.g., "/_template")
* @param resourceName The resource name (e.g., the template or pipeline name).
*/
protected void assertCheckAsDeleteExists(final PublishableHttpResource resource,
final String resourceBasePath, final String resourceName)
throws IOException {
final RestStatus status = randomFrom(successfulCheckStatus(), notFoundCheckStatus());
doCheckAsDeleteWithStatusCode(resource, resourceBasePath, resourceName, status, CheckResponse.EXISTS);
}
/**
* Perform {@link PublishableHttpResource#doCheck(RestClient) doCheck} against the {@code resource} that throws an exception and assert
* that it returns {@code ERRPR} when performing a {@code DELETE} rather than the more common {@code GET}.
*
* @param resource The resource to execute.
* @param resourceBasePath The base endpoint (e.g., "/_template")
* @param resourceName The resource name (e.g., the template or pipeline name).
*/
protected void assertCheckAsDeleteWithException(final PublishableHttpResource resource,
final String resourceBasePath, final String resourceName)
throws IOException {
final String endpoint = concatenateEndpoint(resourceBasePath, resourceName);
final ResponseException responseException = responseException("DELETE", endpoint, failedCheckStatus());
final Exception e = randomFrom(new IOException("expected"), new RuntimeException("expected"), responseException);
when(client.performRequest("DELETE", endpoint, deleteParameters(resource.getParameters()))).thenThrow(e);
assertThat(resource.doCheck(client), is(CheckResponse.ERROR));
}
@ -135,29 +177,44 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
}
protected void assertParameters(final PublishableHttpResource resource) {
final Map<String, String> parameters = resource.getParameters();
final Map<String, String> parameters = new HashMap<>(resource.getParameters());
if (masterTimeout != null) {
assertThat(parameters.get("master_timeout"), is(masterTimeout.toString()));
assertThat(parameters.remove("master_timeout"), is(masterTimeout.toString()));
}
assertThat(parameters.get("filter_path"), is("$NONE"));
assertThat(parameters.remove("filter_path"), is("$NONE"));
assertThat(parameters.isEmpty(), is(true));
}
protected void doCheckWithStatusCode(final PublishableHttpResource resource, final String resourceBasePath, final String resourceName,
final RestStatus status,
final CheckResponse expected)
throws IOException {
doCheckWithStatusCode(resource, resourceBasePath, resourceName, status, GET_EXISTS, GET_DOES_NOT_EXIST, expected);
}
protected void doCheckWithStatusCode(final PublishableHttpResource resource, final String resourceBasePath, final String resourceName,
final RestStatus status, final Set<Integer> exists, final Set<Integer> doesNotExist,
final CheckResponse expected)
throws IOException {
final String endpoint = concatenateEndpoint(resourceBasePath, resourceName);
final Response response = response("GET", endpoint, status);
doCheckWithStatusCode(resource, endpoint, expected, response);
doCheckWithStatusCode(resource, getParameters(resource.getParameters(), exists, doesNotExist), endpoint, expected, response);
}
protected void doCheckWithStatusCode(final PublishableHttpResource resource, final String endpoint, final CheckResponse expected,
final Response response)
throws IOException {
when(client.performRequest("GET", endpoint, resource.getParameters())).thenReturn(response);
doCheckWithStatusCode(resource, getParameters(resource.getParameters()), endpoint, expected, response);
}
protected void doCheckWithStatusCode(final PublishableHttpResource resource, final Map<String, String> expectedParameters,
final String endpoint, final CheckResponse expected,
final Response response)
throws IOException {
when(client.performRequest("GET", endpoint, expectedParameters)).thenReturn(response);
assertThat(resource.doCheck(client), is(expected));
}
@ -175,6 +232,26 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
assertThat(resource.doPublish(client), is(expected));
}
protected void doCheckAsDeleteWithStatusCode(final PublishableHttpResource resource,
final String resourceBasePath, final String resourceName,
final RestStatus status,
final CheckResponse expected)
throws IOException {
final String endpoint = concatenateEndpoint(resourceBasePath, resourceName);
final Response response = response("DELETE", endpoint, status);
doCheckAsDeleteWithStatusCode(resource, endpoint, expected, response);
}
protected void doCheckAsDeleteWithStatusCode(final PublishableHttpResource resource,
final String endpoint, final CheckResponse expected,
final Response response)
throws IOException {
when(client.performRequest("DELETE", endpoint, deleteParameters(resource.getParameters()))).thenReturn(response);
assertThat(resource.doCheck(client), is(expected));
}
protected RestStatus successfulCheckStatus() {
return RestStatus.OK;
}
@ -202,6 +279,10 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
}
protected Response response(final String method, final String endpoint, final RestStatus status) {
return response(method, endpoint, status, null);
}
protected Response response(final String method, final String endpoint, final RestStatus status, final HttpEntity entity) {
final Response response = mock(Response.class);
// fill out the response enough so that the exception can be constructed
final RequestLine requestLine = mock(RequestLine.class);
@ -213,6 +294,8 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
when(response.getRequestLine()).thenReturn(requestLine);
when(response.getStatusLine()).thenReturn(statusLine);
when(response.getEntity()).thenReturn(entity);
return response;
}
@ -224,4 +307,26 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase
}
}
protected Map<String, String> getParameters(final Map<String, String> parameters) {
return getParameters(parameters, GET_EXISTS, GET_DOES_NOT_EXIST);
}
protected Map<String, String> getParameters(final Map<String, String> parameters,
final Set<Integer> exists, final Set<Integer> doesNotExist) {
final Set<Integer> statusCodes = Sets.union(exists, doesNotExist);
final Map<String, String> parametersWithIgnore = new HashMap<>(parameters);
parametersWithIgnore.putIfAbsent("ignore", statusCodes.stream().map(i -> i.toString()).collect(Collectors.joining(",")));
return parametersWithIgnore;
}
protected Map<String, String> deleteParameters(final Map<String, String> parameters) {
final Map<String, String> parametersWithIgnore = new HashMap<>(parameters);
parametersWithIgnore.putIfAbsent("ignore", "404");
return parametersWithIgnore;
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests {@link ClusterAlertHttpResource}.
*/
public class ClusterAlertHttpResourceTests extends AbstractPublishableHttpResourceTestCase {
private final XPackLicenseState licenseState = mock(XPackLicenseState.class);
private final String watchId = randomFrom(ClusterAlertsUtil.WATCH_IDS);
private final String watchValue = "{\"totally-valid\":{}}";
private final ClusterAlertHttpResource resource = new ClusterAlertHttpResource(owner, licenseState, () -> watchId, () -> watchValue);
public void testWatchToHttpEntity() throws IOException {
final byte[] watchValueBytes = watchValue.getBytes(ContentType.APPLICATION_JSON.getCharset());
final byte[] actualBytes = new byte[watchValueBytes.length];
final HttpEntity entity = resource.watchToHttpEntity();
assertThat(entity.getContentType().getValue(), is(ContentType.APPLICATION_JSON.toString()));
final InputStream byteStream = entity.getContent();
assertThat(byteStream.available(), is(watchValueBytes.length));
assertThat(byteStream.read(actualBytes), is(watchValueBytes.length));
assertArrayEquals(watchValueBytes, actualBytes);
assertThat(byteStream.available(), is(0));
}
public void testDoCheckGetWatchExists() throws IOException {
when(licenseState.isMonitoringClusterAlertsAllowed()).thenReturn(true);
assertCheckExists(resource, "/_xpack/watcher/watch", watchId);
}
public void testDoCheckGetWatchDoesNotExist() throws IOException {
when(licenseState.isMonitoringClusterAlertsAllowed()).thenReturn(true);
assertCheckDoesNotExist(resource, "/_xpack/watcher/watch", watchId);
}
public void testDoCheckWithExceptionGetWatchError() throws IOException {
when(licenseState.isMonitoringClusterAlertsAllowed()).thenReturn(true);
assertCheckWithException(resource, "/_xpack/watcher/watch", watchId);
}
public void testDoCheckAsDeleteWatchExists() throws IOException {
when(licenseState.isMonitoringClusterAlertsAllowed()).thenReturn(false);
assertCheckAsDeleteExists(resource, "/_xpack/watcher/watch", watchId);
}
public void testDoCheckWithExceptionAsDeleteWatchError() throws IOException {
when(licenseState.isMonitoringClusterAlertsAllowed()).thenReturn(false);
assertCheckAsDeleteWithException(resource, "/_xpack/watcher/watch", watchId);
}
public void testDoPublishTrue() throws IOException {
assertPublishSucceeds(resource, "/_xpack/watcher/watch", watchId, StringEntity.class);
}
public void testDoPublishFalse() throws IOException {
assertPublishFails(resource, "/_xpack/watcher/watch", watchId, StringEntity.class);
}
public void testDoPublishFalseWithException() throws IOException {
assertPublishWithException(resource, "/_xpack/watcher/watch", watchId, StringEntity.class);
}
public void testParameters() {
final Map<String, String> parameters = new HashMap<>(resource.getParameters());
assertThat(parameters.remove("filter_path"), is("$NONE"));
assertThat(parameters.isEmpty(), is(true));
}
}

View File

@ -59,12 +59,14 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils.DATA_INDEX;
import static org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.FILTER_PATH_NONE;
import static org.elasticsearch.xpack.monitoring.exporter.http.WatcherExistsHttpResource.WATCHER_CHECK_PARAMETERS;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
@ -72,6 +74,15 @@ import static org.hamcrest.Matchers.startsWith;
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class HttpExporterIT extends MonitoringIntegTestCase {
private final boolean typeMappingsExistAlready = randomBoolean();
private final boolean templatesExistsAlready = randomBoolean();
private final boolean pipelineExistsAlready = randomBoolean();
private final boolean remoteClusterAllowsWatcher = randomBoolean();
private final boolean currentLicenseAllowsWatcher = true;
private final boolean watcherAlreadyExists = randomBoolean();
private final boolean bwcIndexesExist = randomBoolean();
private final boolean bwcAliasesExist = randomBoolean();
private MockWebServer webServer;
@Before
@ -90,39 +101,30 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
public void testExport() throws Exception {
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
final Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer));
internalCluster().startNode(builder);
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist);
assertBulk(webServer, nbDocs);
}
public void testExportWithHeaders() throws Exception {
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
final String headerValue = randomAlphaOfLengthBetween(3, 9);
final String[] array = generateRandomStringArray(2, 4, false);
@ -132,11 +134,6 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
headers.put("X-Found-Cluster", new String[] { headerValue });
headers.put("Array-Check", array);
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer))
@ -146,11 +143,19 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
internalCluster().startNode(builder);
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist,
headers, null);
assertBulk(webServer, nbDocs, headers, null);
@ -158,11 +163,6 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
public void testExportWithBasePath() throws Exception {
final boolean useHeaders = randomBoolean();
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
final String headerValue = randomAlphaOfLengthBetween(3, 9);
final String[] array = generateRandomStringArray(2, 4, false);
@ -175,11 +175,6 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
headers.put("Array-Check", array);
}
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false}");
String basePath = "path/to";
if (randomBoolean()) {
@ -207,38 +202,44 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
internalCluster().startNode(builder);
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false}");
final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs));
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist,
headers, basePath);
assertBulk(webServer, nbDocs, headers, basePath);
}
public void testHostChangeReChecksTemplate() throws Exception {
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer));
internalCluster().startNode(builder);
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false}");
internalCluster().startNode(builder);
export(Collections.singletonList(newRandomMonitoringDoc()));
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready, bwcIndexesExist, bwcAliasesExist);
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist);
assertBulk(webServer);
try (MockWebServer secondWebServer = createMockWebServer()) {
@ -259,9 +260,10 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
// opposite of if it existed before
enqueuePipelineResponses(secondWebServer, !pipelineExistsAlready);
enqueueBackwardsCompatibilityAliasResponse(secondWebServer, bwcIndexesExist, true);
enqueueWatcherResponses(secondWebServer, remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists);
enqueueResponse(secondWebServer, 200, "{\"errors\": false}");
logger.info("--> exporting a second event");
// second event
export(Collections.singletonList(newRandomMonitoringDoc()));
assertMonitorVersion(secondWebServer);
@ -283,6 +285,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
assertMonitorPipelines(secondWebServer, !pipelineExistsAlready, null, null);
assertMonitorBackwardsCompatibilityAliases(secondWebServer, false, null, null);
assertMonitorWatches(secondWebServer, remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
null, null);
assertBulk(secondWebServer);
}
}
@ -307,12 +311,6 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
public void testDynamicIndexFormatChange() throws Exception {
final boolean typeMappingsExistAlready = randomBoolean();
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final boolean bwcIndexesExist = randomBoolean();
final boolean bwcAliasesExist = randomBoolean();
Settings.Builder builder = Settings.builder().put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer));
@ -322,6 +320,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
@ -330,6 +329,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
assertMonitorResources(webServer,
typeMappingsExistAlready, templatesExistsAlready, pipelineExistsAlready,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist);
MockRequest recordedRequest = assertBulk(webServer);
@ -347,7 +347,9 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
.setTransientSettings(Settings.builder().put("xpack.monitoring.exporters._http.index.name.time_format", newTimeFormat)));
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer, true, true, true, false, false);
enqueueSetupResponses(webServer, true, true, true,
true, true, true,
false, false);
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
doc = newRandomMonitoringDoc();
@ -356,7 +358,9 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
String expectedMonitoringIndex = ".monitoring-es-" + MonitoringTemplateUtils.TEMPLATE_VERSION + "-"
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.getTimestamp());
assertMonitorResources(webServer, true, true, true, false, false);
assertMonitorResources(webServer, true, true, true,
true, true, true,
false, false);
recordedRequest = assertBulk(webServer);
bytes = recordedRequest.getBody().getBytes(StandardCharsets.UTF_8);
@ -386,14 +390,19 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
private void assertMonitorResources(final MockWebServer webServer,
final boolean typeMappingsExistAlready,
final boolean templateAlreadyExists, final boolean pipelineAlreadyExists,
final boolean remoteClusterAllowsWatcher, final boolean currentLicenseAllowsWatcher,
final boolean watcherAlreadyExists,
final boolean bwcIndexesExist, final boolean bwcAliasesExist) throws Exception {
assertMonitorResources(webServer, typeMappingsExistAlready, templateAlreadyExists, pipelineAlreadyExists,
remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
bwcIndexesExist, bwcAliasesExist, null, null);
}
private void assertMonitorResources(final MockWebServer webServer,
final boolean typeMappingsExistAlready,
final boolean templateAlreadyExists, final boolean pipelineAlreadyExists,
final boolean remoteClusterAllowsWatcher, final boolean currentLicenseAllowsWatcher,
final boolean watcherAlreadyExists,
boolean bwcIndexesExist, boolean bwcAliasesExist,
@Nullable final Map<String, String[]> customHeaders,
@Nullable final String basePath) throws Exception {
@ -402,6 +411,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
assertMonitorTemplates(webServer, templateAlreadyExists, customHeaders, basePath);
assertMonitorPipelines(webServer, pipelineAlreadyExists, customHeaders, basePath);
assertMonitorBackwardsCompatibilityAliases(webServer, bwcIndexesExist && false == bwcAliasesExist, customHeaders, basePath);
assertMonitorWatches(webServer, remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists,
customHeaders, basePath);
}
private void assertMonitorMappingTypes(final MockWebServer webServer,
@ -479,6 +490,53 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
}
private void assertMonitorWatches(final MockWebServer webServer,
final boolean remoteClusterAllowsWatcher, final boolean currentLicenseAllowsWatcher,
final boolean alreadyExists,
@Nullable final Map<String, String[]> customHeaders,
@Nullable final String basePath) throws Exception {
final String pathPrefix = basePathToAssertablePrefix(basePath);
MockRequest request;
request = webServer.takeRequest();
// GET /_xpack
assertThat(request.getMethod(), equalTo("GET"));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack"));
assertThat(request.getUri().getQuery(), equalTo(watcherCheckQueryString()));
assertHeaders(request, customHeaders);
if (remoteClusterAllowsWatcher) {
for (Tuple<String, String> watch : monitoringWatches()) {
request = webServer.takeRequest();
// GET / PUT if we are allowed to use it
if (currentLicenseAllowsWatcher) {
assertThat(request.getMethod(), equalTo("GET"));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack/watcher/watch/" + watch.v1()));
assertThat(request.getUri().getQuery(), equalTo(resourceQueryString()));
assertHeaders(request, customHeaders);
if (alreadyExists == false) {
request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("PUT"));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack/watcher/watch/" + watch.v1()));
assertThat(request.getUri().getQuery(), equalTo(resourceQueryString()));
assertThat(request.getBody(), equalTo(watch.v2()));
assertHeaders(request, customHeaders);
}
// DELETE if we're not allowed to use it
} else {
assertThat(request.getMethod(), equalTo("DELETE"));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack/watcher/watch/" + watch.v1()));
assertThat(request.getUri().getQuery(), equalTo(resourceQueryString()));
assertHeaders(request, customHeaders);
}
}
}
}
private void assertMonitorBackwardsCompatibilityAliases(final MockWebServer webServer, final boolean expectPost,
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath) throws Exception {
final String pathPrefix = basePathToAssertablePrefix(basePath);
@ -550,6 +608,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
assertThat(exporters, notNullValue());
// Wait for exporting bulks to be ready to export
assertBusy(() -> assertThat(clusterService().state().version(), not(ClusterState.UNKNOWN_VERSION)));
assertBusy(() -> exporters.forEach(exporter -> assertThat(exporter.openBulk(), notNullValue())));
PlainActionFuture<Void> future = new PlainActionFuture<>();
exporters.export(docs, future);
@ -597,6 +656,10 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
return "filter_path=" + FILTER_PATH_NONE;
}
private String watcherCheckQueryString() {
return "filter_path=" + WATCHER_CHECK_PARAMETERS.get("filter_path");
}
private String bulkQueryString() {
return "pipeline=" + Exporter.EXPORT_PIPELINE_NAME + "&filter_path=" + "errors,items.*.error";
}
@ -614,11 +677,14 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
private void enqueueSetupResponses(MockWebServer webServer,
boolean typeMappingsAlreadyExist,
boolean templatesAlreadyExists, boolean pipelineAlreadyExists,
boolean remoteClusterAllowsWatcher, boolean currentLicenseAllowsWatcher,
boolean watcherAlreadyExists,
boolean bwcIndexesExist, boolean bwcAliasesExist) throws IOException {
enqueueMappingTypeResponses(webServer, typeMappingsAlreadyExist);
enqueueTemplateResponses(webServer, templatesAlreadyExists);
enqueuePipelineResponses(webServer, pipelineAlreadyExists);
enqueueBackwardsCompatibilityAliasResponse(webServer, bwcIndexesExist, bwcAliasesExist);
enqueueWatcherResponses(webServer, remoteClusterAllowsWatcher, currentLicenseAllowsWatcher, watcherAlreadyExists);
}
private void enqueueMappingTypeResponses(final MockWebServer webServer, final boolean alreadyExists) throws IOException {
@ -684,6 +750,65 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
enqueueResponse(webServer, 200, "pipeline [" + Exporter.EXPORT_PIPELINE_NAME + "] exists");
}
private void enqueueWatcherResponses(final MockWebServer webServer,
final boolean remoteClusterAllowsWatcher, final boolean currentLicenseAllowsWatcher,
final boolean alreadyExists) throws IOException {
// if the remote cluster doesn't allow watcher, then we only check for it and we're done
if (remoteClusterAllowsWatcher) {
// X-Pack exists and Watcher can be used
enqueueResponse(webServer, 200, "{\"features\":{\"watcher\":{\"available\":true,\"enabled\":true}}}");
// if we have an active license that's not Basic, then we should add watches
if (currentLicenseAllowsWatcher) {
if (alreadyExists) {
enqueuePutWatchResponsesExistsAlready(webServer);
} else {
enqueuePutWatchResponsesDoesNotExistYet(webServer);
}
// otherwise we need to delete them from the remote cluster
} else {
enqueueDeleteWatchResponses(webServer);
}
} else {
// X-Pack exists but Watcher just cannot be used
if (randomBoolean()) {
final String responseBody = randomFrom(
"{\"features\":{\"watcher\":{\"available\":false,\"enabled\":true}}}",
"{\"features\":{\"watcher\":{\"available\":true,\"enabled\":false}}}",
"{}"
);
enqueueResponse(webServer, 200, responseBody);
// X-Pack is not installed
} else {
enqueueResponse(webServer, 404, "{}");
}
}
}
private void enqueuePutWatchResponsesDoesNotExistYet(final MockWebServer webServer) throws IOException {
for (String watchId : monitoringWatchIds()) {
enqueueResponse(webServer, 404, "watch [" + watchId + "] does not exist");
enqueueResponse(webServer, 201, "watch [" + watchId + "] created");
}
}
private void enqueuePutWatchResponsesExistsAlready(final MockWebServer webServer) throws IOException {
for (String watchId : monitoringWatchIds()) {
enqueueResponse(webServer, 200, "watch [" + watchId + "] exists");
}
}
private void enqueueDeleteWatchResponses(final MockWebServer webServer) throws IOException {
for (String watchId : monitoringWatchIds()) {
if (randomBoolean()) {
enqueueResponse(webServer, 404, "watch [" + watchId + "] did not exist");
} else {
enqueueResponse(webServer, 200, "watch [" + watchId + "] deleted");
}
}
}
private void enqueueBackwardsCompatibilityAliasResponse(MockWebServer webServer, boolean bwcIndexesExist, boolean bwcAliasesExist)
throws IOException {
if (false == bwcIndexesExist && randomBoolean()) {

View File

@ -13,7 +13,12 @@ import org.elasticsearch.Version;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
@ -39,17 +44,26 @@ import static org.mockito.Mockito.when;
*/
public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTestCase {
private final ClusterState state = mockClusterState(true);
private final ClusterService clusterService = mockClusterService(state);
private final XPackLicenseState licenseState = mock(XPackLicenseState.class);
private final boolean remoteClusterHasWatcher = randomBoolean();
private final boolean validLicense = randomBoolean();
/**
* kibana, logstash, beats
*/
private final int EXPECTED_TYPES = MonitoringTemplateUtils.NEW_DATA_TYPES.length;
private final int EXPECTED_TEMPLATES = 6;
private final int EXPECTED_WATCHES = 4;
private final RestClient client = mock(RestClient.class);
private final Response versionResponse = mock(Response.class);
private final MultiHttpResource resources =
HttpExporter.createResources(new Exporter.Config("_http", "http", Settings.EMPTY), new ResolversRegistry(Settings.EMPTY));
HttpExporter.createResources(
new Exporter.Config("_http", "http", Settings.EMPTY, Settings.EMPTY, clusterService, licenseState),
new ResolversRegistry(Settings.EMPTY));
public void testInvalidVersionBlocks() throws IOException {
final HttpEntity entity = new StringEntity("{\"version\":{\"number\":\"unknown\"}}", ContentType.APPLICATION_JSON);
@ -339,7 +353,278 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
verifyNoMoreInteractions(client);
}
public void testSuccessfulChecks() throws IOException {
public void testWatcherCheckBlocksAfterSuccessfulPipelines() throws IOException {
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES);
final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates;
final int successfulGetPipelines = randomIntBetween(0, 1);
final int unsuccessfulGetPipelines = 1 - successfulGetPipelines;
final Exception exception = failureGetException();
whenValidVersionResponse();
whenGetTypeMappingResponse(successfulGetTypeMappings, unsuccessfulGetTypeMappings);
whenSuccessfulPutTypeMappings(EXPECTED_TYPES);
whenGetTemplates(successfulGetTemplates, unsuccessfulGetTemplates);
whenSuccessfulPutTemplates(unsuccessfulGetTemplates);
whenGetPipelines(successfulGetPipelines, unsuccessfulGetPipelines);
whenSuccessfulPutPipelines(1);
whenSuccessfulBackwardsCompatibilityAliases();
// there's only one check
when(client.performRequest(eq("GET"), eq("/_xpack"), anyMapOf(String.class, String.class))).thenThrow(exception);
assertTrue(resources.isDirty());
assertFalse(resources.checkAndPublish(client));
// ensure it didn't magically become not-dirty
assertTrue(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(EXPECTED_TYPES);
verifyPutTypeMappings(unsuccessfulGetTypeMappings);
verifyGetTemplates(EXPECTED_TEMPLATES);
verifyPutTemplates(unsuccessfulGetTemplates);
verifyGetPipelines(1);
verifyPutPipelines(unsuccessfulGetPipelines);
verifyBackwardsCompatibilityAliases();
verifyWatcherCheck();
verifyNoMoreInteractions(client);
}
public void testWatchCheckBlocksAfterSuccessfulWatcherCheck() throws IOException {
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES);
final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates;
final int successfulGetPipelines = randomIntBetween(0, 1);
final int unsuccessfulGetPipelines = 1 - successfulGetPipelines;
final Exception exception = validLicense ? failureGetException() : failureDeleteException();
final boolean firstSucceeds = randomBoolean();
int expectedGets = 1;
int expectedPuts = 0;
whenValidVersionResponse();
whenGetTypeMappingResponse(successfulGetTypeMappings, unsuccessfulGetTypeMappings);
whenSuccessfulPutTypeMappings(EXPECTED_TYPES);
whenGetTemplates(successfulGetTemplates, unsuccessfulGetTemplates);
whenSuccessfulPutTemplates(unsuccessfulGetTemplates);
whenGetPipelines(successfulGetPipelines, unsuccessfulGetPipelines);
whenSuccessfulPutPipelines(1);
whenSuccessfulBackwardsCompatibilityAliases();
whenWatcherCanBeUsed(validLicense);
// failure in the middle of various watches being checked/published; suggests a node dropped
if (firstSucceeds) {
// getting _and_ putting watches
if (validLicense) {
final boolean successfulFirst = randomBoolean();
// -2 from one success + a necessary failure after it!
final int extraPasses = randomIntBetween(0, EXPECTED_WATCHES - 2);
final int successful = randomIntBetween(0, extraPasses);
final int unsuccessful = extraPasses - successful;
final Response first = successfulFirst ? successfulGetResponse() : unsuccessfulGetResponse();
final List<Response> otherResponses = getResponses(successful, unsuccessful);
// last check fails implies that N - 2 publishes succeeded!
when(client.performRequest(eq("GET"), startsWith("/_xpack/watcher/watch/"), anyMapOf(String.class, String.class)))
.thenReturn(first, otherResponses.toArray(new Response[otherResponses.size()]))
.thenThrow(exception);
whenSuccessfulPutWatches(otherResponses.size() + 1);
// +1 for the "first"
expectedGets += 1 + successful + unsuccessful;
expectedPuts = (successfulFirst ? 0 : 1) + unsuccessful;
// deleting watches
} else {
// - 1 from necessary failure after it!
final int successful = randomIntBetween(1, EXPECTED_WATCHES - 1);
// there is no form of an unsuccessful delete; only success or error
final List<Response> responses = successfulDeleteResponses(successful);
when(client.performRequest(eq("DELETE"), startsWith("/_xpack/watcher/watch/"), anyMapOf(String.class, String.class)))
.thenReturn(responses.get(0), responses.subList(1, successful).toArray(new Response[successful - 1]))
.thenThrow(exception);
expectedGets += successful;
}
} else {
final String method = validLicense ? "GET" : "DELETE";
when(client.performRequest(eq(method), startsWith("/_xpack/watcher/watch/"), anyMapOf(String.class, String.class)))
.thenThrow(exception);
}
assertTrue(resources.isDirty());
assertFalse(resources.checkAndPublish(client));
// ensure it didn't magically become not-dirty
assertTrue(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(EXPECTED_TYPES);
verifyPutTypeMappings(unsuccessfulGetTypeMappings);
verifyGetTemplates(EXPECTED_TEMPLATES);
verifyPutTemplates(unsuccessfulGetTemplates);
verifyGetPipelines(1);
verifyPutPipelines(unsuccessfulGetPipelines);
verifyBackwardsCompatibilityAliases();
verifyWatcherCheck();
if (validLicense) {
verifyGetWatches(expectedGets);
verifyPutWatches(expectedPuts);
} else {
verifyDeleteWatches(expectedGets);
}
verifyNoMoreInteractions(client);
}
public void testWatchPublishBlocksAfterSuccessfulWatcherCheck() throws IOException {
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES);
final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates;
final int successfulGetPipelines = randomIntBetween(0, 1);
final int unsuccessfulGetPipelines = 1 - successfulGetPipelines;
final Exception exception = failurePutException();
final boolean firstSucceeds = randomBoolean();
int expectedGets = 1;
int expectedPuts = 1;
whenValidVersionResponse();
whenGetTypeMappingResponse(successfulGetTypeMappings, unsuccessfulGetTypeMappings);
whenSuccessfulPutTypeMappings(EXPECTED_TYPES);
whenGetTemplates(successfulGetTemplates, unsuccessfulGetTemplates);
whenSuccessfulPutTemplates(unsuccessfulGetTemplates);
whenGetPipelines(successfulGetPipelines, unsuccessfulGetPipelines);
whenSuccessfulPutPipelines(1);
whenSuccessfulBackwardsCompatibilityAliases();
// license needs to be valid, otherwise we'll do DELETEs, which are tested earlier
whenWatcherCanBeUsed(true);
// failure in the middle of various watches being checked/published; suggests a node dropped
if (firstSucceeds) {
final Response firstSuccess = successfulPutResponse();
// -2 from one success + a necessary failure after it!
final int extraPasses = randomIntBetween(0, EXPECTED_WATCHES - 2);
final int successful = randomIntBetween(0, extraPasses);
final int unsuccessful = extraPasses - successful;
final List<Response> otherResponses = successfulPutResponses(unsuccessful);
// first one passes for sure, so we need an extra "unsuccessful" GET
whenGetWatches(successful, unsuccessful + 2);
// previous publishes must have succeeded
when(client.performRequest(eq("PUT"),
startsWith("/_xpack/watcher/watch/"),
anyMapOf(String.class, String.class),
any(HttpEntity.class)))
.thenReturn(firstSuccess, otherResponses.toArray(new Response[otherResponses.size()]))
.thenThrow(exception);
// GETs required for each PUT attempt (first is guaranteed "unsuccessful")
expectedGets += successful + unsuccessful + 1;
// unsuccessful are PUT attempts + the guaranteed successful PUT (first)
expectedPuts += unsuccessful + 1;
} else {
// fail the check so that it has to attempt the PUT
whenGetWatches(0, 1);
when(client.performRequest(eq("PUT"),
startsWith("/_xpack/watcher/watch/"),
anyMapOf(String.class, String.class),
any(HttpEntity.class)))
.thenThrow(exception);
}
assertTrue(resources.isDirty());
assertFalse(resources.checkAndPublish(client));
// ensure it didn't magically become not-dirty
assertTrue(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(EXPECTED_TYPES);
verifyPutTypeMappings(unsuccessfulGetTypeMappings);
verifyGetTemplates(EXPECTED_TEMPLATES);
verifyPutTemplates(unsuccessfulGetTemplates);
verifyGetPipelines(1);
verifyPutPipelines(unsuccessfulGetPipelines);
verifyBackwardsCompatibilityAliases();
verifyWatcherCheck();
verifyGetWatches(expectedGets);
verifyPutWatches(expectedPuts);
verifyNoMoreInteractions(client);
}
public void testSuccessfulChecksOnElectedMasterNode() throws IOException {
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES);
final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates;
final int successfulGetPipelines = randomIntBetween(0, 1);
final int unsuccessfulGetPipelines = 1 - successfulGetPipelines;
final int successfulGetWatches = randomIntBetween(0, EXPECTED_WATCHES);
final int unsuccessfulGetWatches = EXPECTED_WATCHES - successfulGetWatches;
whenValidVersionResponse();
whenGetTypeMappingResponse(successfulGetTypeMappings, unsuccessfulGetTypeMappings);
whenSuccessfulPutTypeMappings(EXPECTED_TYPES);
whenGetTemplates(successfulGetTemplates, unsuccessfulGetTemplates);
whenSuccessfulPutTemplates(unsuccessfulGetTemplates);
whenGetPipelines(successfulGetPipelines, unsuccessfulGetPipelines);
whenSuccessfulPutPipelines(1);
if (remoteClusterHasWatcher) {
whenWatcherCanBeUsed(validLicense);
if (validLicense) {
whenGetWatches(successfulGetWatches, unsuccessfulGetWatches);
whenSuccessfulPutWatches(unsuccessfulGetWatches);
} else {
whenSuccessfulDeleteWatches(EXPECTED_WATCHES);
}
} else {
whenWatcherCannotBeUsed();
}
whenSuccessfulBackwardsCompatibilityAliases();
assertTrue(resources.isDirty());
// it should be able to proceed!
assertTrue(resources.checkAndPublish(client));
assertFalse(resources.isDirty());
verifyVersionCheck();
verifyGetTypeMappings(EXPECTED_TYPES);
verifyPutTypeMappings(unsuccessfulGetTypeMappings);
verifyGetTemplates(EXPECTED_TEMPLATES);
verifyPutTemplates(unsuccessfulGetTemplates);
verifyGetPipelines(1);
verifyPutPipelines(unsuccessfulGetPipelines);
verifyWatcherCheck();
if (remoteClusterHasWatcher) {
if (validLicense) {
verifyGetWatches(EXPECTED_WATCHES);
verifyPutWatches(unsuccessfulGetWatches);
} else {
verifyDeleteWatches(EXPECTED_WATCHES);
}
}
verifyBackwardsCompatibilityAliases();
verifyNoMoreInteractions(client);
}
/**
* If the node is not the elected master node, then it should never check Watcher or send Watches (Cluster Alerts).
*/
public void testSuccessfulChecksIfNotElectedMasterNode() throws IOException {
final ClusterState state = mockClusterState(false);
final ClusterService clusterService = mockClusterService(state);
final MultiHttpResource resources =
HttpExporter.createResources(
new Exporter.Config("_http", "http", Settings.EMPTY, Settings.EMPTY, clusterService, licenseState),
new ResolversRegistry(Settings.EMPTY));
final int successfulGetTypeMappings = randomIntBetween(0, EXPECTED_TYPES);
final int unsuccessfulGetTypeMappings = EXPECTED_TYPES - successfulGetTypeMappings;
final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES);
@ -385,6 +670,12 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
return randomFrom(new IOException("expected"), new RuntimeException("expected"), responseException);
}
private Exception failureDeleteException() {
final ResponseException responseException = responseException("DELETE", "/_delete_something", failedCheckStatus());
return randomFrom(new IOException("expected"), new RuntimeException("expected"), responseException);
}
private Response successfulGetResponse() {
return response("GET", "/_get_something", successfulCheckStatus());
}
@ -469,6 +760,22 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
return responses;
}
private Response successfulDeleteResponse() {
final RestStatus status = randomFrom(successfulCheckStatus(), notFoundCheckStatus());
return response("DELETE", "/_delete_something", status);
}
private List<Response> successfulDeleteResponses(final int successful) {
final List<Response> responses = new ArrayList<>(successful);
for (int i = 0; i < successful; ++i) {
responses.add(successfulDeleteResponse());
}
return responses;
}
private void whenValidVersionResponse() throws IOException {
final HttpEntity entity = new StringEntity("{\"version\":{\"number\":\"" + Version.CURRENT + "\"}}", ContentType.APPLICATION_JSON);
@ -561,6 +868,88 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
}
}
private void whenWatcherCanBeUsed(final boolean validLicense) throws IOException {
final MetaData metaData = mock(MetaData.class);
when(state.metaData()).thenReturn(metaData);
when(metaData.clusterUUID()).thenReturn("the_clusters_uuid");
when(licenseState.isMonitoringClusterAlertsAllowed()).thenReturn(validLicense);
final HttpEntity entity =
new StringEntity("{\"features\":{\"watcher\":{\"enabled\":true,\"available\":true}}}", ContentType.APPLICATION_JSON);
final Response successfulGet = response("GET", "_xpack", successfulCheckStatus(), entity);
// empty is possible if they all exist
when(client.performRequest(eq("GET"), eq("/_xpack"), anyMapOf(String.class, String.class))).thenReturn(successfulGet);
}
private void whenWatcherCannotBeUsed() throws IOException {
final Response response;
if (randomBoolean()) {
final HttpEntity entity = randomFrom(
new StringEntity("{\"features\":{\"watcher\":{\"enabled\":false,\"available\":true}}}", ContentType.APPLICATION_JSON),
new StringEntity("{\"features\":{\"watcher\":{\"enabled\":true,\"available\":false}}}", ContentType.APPLICATION_JSON),
new StringEntity("{}", ContentType.APPLICATION_JSON)
);
response = response("GET", "_xpack", successfulCheckStatus(), entity);
} else {
response = response("GET", "_xpack", notFoundCheckStatus());
}
// empty is possible if they all exist
when(client.performRequest(eq("GET"), eq("/_xpack"), anyMapOf(String.class, String.class))).thenReturn(response);
}
private void whenGetWatches(final int successful, final int unsuccessful) throws IOException {
final List<Response> gets = getResponses(successful, unsuccessful);
if (gets.size() == 1) {
when(client.performRequest(eq("GET"), startsWith("/_xpack/watcher/watch/"), anyMapOf(String.class, String.class)))
.thenReturn(gets.get(0));
} else {
when(client.performRequest(eq("GET"), startsWith("/_xpack/watcher/watch/"), anyMapOf(String.class, String.class)))
.thenReturn(gets.get(0), gets.subList(1, gets.size()).toArray(new Response[gets.size() - 1]));
}
}
private void whenSuccessfulPutWatches(final int successful) throws IOException {
final List<Response> successfulPuts = successfulPutResponses(successful);
// empty is possible if they all exist
if (successful == 1) {
when(client.performRequest(eq("PUT"),
startsWith("/_xpack/watcher/watch/"),
anyMapOf(String.class, String.class),
any(HttpEntity.class)))
.thenReturn(successfulPuts.get(0));
} else if (successful > 1) {
when(client.performRequest(eq("PUT"),
startsWith("/_xpack/watcher/watch/"),
anyMapOf(String.class, String.class),
any(HttpEntity.class)))
.thenReturn(successfulPuts.get(0), successfulPuts.subList(1, successful).toArray(new Response[successful - 1]));
}
}
private void whenSuccessfulDeleteWatches(final int successful) throws IOException {
final List<Response> successfulDeletes = successfulDeleteResponses(successful);
// empty is possible if they all exist
if (successful == 1) {
when(client.performRequest(eq("DELETE"),
startsWith("/_xpack/watcher/watch/"),
anyMapOf(String.class, String.class)))
.thenReturn(successfulDeletes.get(0));
} else if (successful > 1) {
when(client.performRequest(eq("DELETE"),
startsWith("/_xpack/watcher/watch/"),
anyMapOf(String.class, String.class)))
.thenReturn(successfulDeletes.get(0), successfulDeletes.subList(1, successful).toArray(new Response[successful - 1]));
}
}
private void whenSuccessfulBackwardsCompatibilityAliases() throws IOException {
// Just return no indexes so we won't have to mock adding aliases
@ -615,7 +1004,49 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe
any(HttpEntity.class)); // raw template
}
private void verifyWatcherCheck() throws IOException {
verify(client).performRequest(eq("GET"), eq("/_xpack"), anyMapOf(String.class, String.class));
}
private void verifyDeleteWatches(final int called) throws IOException {
verify(client, times(called)).performRequest(eq("DELETE"), // method
startsWith("/_xpack/watcher/watch/"), // endpoint
anyMapOf(String.class, String.class));// parameters (e.g., timeout)
}
private void verifyGetWatches(final int called) throws IOException {
verify(client, times(called)).performRequest(eq("GET"),
startsWith("/_xpack/watcher/watch/"),
anyMapOf(String.class, String.class));
}
private void verifyPutWatches(final int called) throws IOException {
verify(client, times(called)).performRequest(eq("PUT"), // method
startsWith("/_xpack/watcher/watch/"), // endpoint
anyMapOf(String.class, String.class), // parameters (e.g., timeout)
any(HttpEntity.class)); // raw template
}
private void verifyBackwardsCompatibilityAliases() throws IOException {
verify(client).performRequest(eq("GET"), startsWith("/.marvel-es-1-*"), anyMapOf(String.class, String.class));
}
private ClusterService mockClusterService(final ClusterState state) {
final ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(state);
return clusterService;
}
private ClusterState mockClusterState(final boolean electedMaster) {
final ClusterState state = mock(ClusterState.class);
final DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(state.nodes()).thenReturn(nodes);
when(nodes.isLocalNodeElectedMaster()).thenReturn(electedMaster);
return state;
}
}

View File

@ -11,20 +11,28 @@ import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.Exporter.Config;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.resolver.ResolversRegistry;
import org.elasticsearch.xpack.ssl.SSLService;
import org.junit.Before;
import org.mockito.InOrder;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -51,9 +59,25 @@ import static org.mockito.Mockito.when;
*/
public class HttpExporterTests extends ESTestCase {
private final ClusterService clusterService = mock(ClusterService.class);
private final XPackLicenseState licenseState = mock(XPackLicenseState.class);
private final MetaData metaData = mock(MetaData.class);
private final SSLService sslService = mock(SSLService.class);
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
@Before
public void setupClusterState() {
final ClusterState clusterState = mock(ClusterState.class);
final DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.metaData()).thenReturn(metaData);
when(clusterState.nodes()).thenReturn(nodes);
// always let the watcher resources run for these tests; HttpExporterResourceTests tests it flipping on/off
when(nodes.isLocalNodeElectedMaster()).thenReturn(true);
}
public void testExporterWithBlacklistedHeaders() {
final String blacklistedHeader = randomFrom(HttpExporter.BLACKLISTED_HEADERS);
final String expected = "[" + blacklistedHeader + "] cannot be overwritten via [xpack.monitoring.exporters._http.headers]";
@ -263,6 +287,7 @@ public class HttpExporterTests extends ESTestCase {
public void testCreateResources() {
final boolean useIngest = randomBoolean();
final boolean clusterAlertManagement = randomBoolean();
final TimeValue templateTimeout = randomFrom(TimeValue.timeValueSeconds(30), null);
final TimeValue pipelineTimeout = randomFrom(TimeValue.timeValueSeconds(30), null);
final TimeValue aliasTimeout = randomFrom(TimeValue.timeValueSeconds(30), null);
@ -274,6 +299,10 @@ public class HttpExporterTests extends ESTestCase {
builder.put("xpack.monitoring.exporters._http.use_ingest", false);
}
if (clusterAlertManagement == false) {
builder.put("xpack.monitoring.exporters._http.cluster_alerts.management.enabled", false);
}
if (templateTimeout != null) {
builder.put("xpack.monitoring.exporters._http.index.template.master_timeout", templateTimeout.getStringRep());
}
@ -305,6 +334,19 @@ public class HttpExporterTests extends ESTestCase {
resources.stream().filter((resource) -> resource instanceof PipelineHttpResource)
.map(PipelineHttpResource.class::cast)
.collect(Collectors.toList());
final List<WatcherExistsHttpResource> watcherCheck =
resources.stream().filter((resource) -> resource instanceof WatcherExistsHttpResource)
.map(WatcherExistsHttpResource.class::cast)
.collect(Collectors.toList());
final List<ClusterAlertHttpResource> watches;
if (watcherCheck.isEmpty()) {
watches = Collections.emptyList();
} else {
watches = watcherCheck.get(0).getWatches().getResources()
.stream().filter((resource) -> resource instanceof ClusterAlertHttpResource)
.map(ClusterAlertHttpResource.class::cast)
.collect(Collectors.toList());
}
final List<BackwardsCompatibilityAliasesResource> bwc =
resources.stream().filter(resource -> resource instanceof BackwardsCompatibilityAliasesResource)
.map(BackwardsCompatibilityAliasesResource.class::cast)
@ -312,11 +354,13 @@ public class HttpExporterTests extends ESTestCase {
// expected number of resources
assertThat(multiResource.getResources().size(),
equalTo(version + typeMappings.size() + templates.size() + pipelines.size() + bwc.size()));
equalTo(version + typeMappings.size() + templates.size() + pipelines.size() + watcherCheck.size() + bwc.size()));
assertThat(version, equalTo(1));
assertThat(typeMappings, hasSize(MonitoringTemplateUtils.NEW_DATA_TYPES.length));
assertThat(templates, hasSize(6));
assertThat(pipelines, hasSize(useIngest ? 1 : 0));
assertThat(watcherCheck, hasSize(clusterAlertManagement ? 1 : 0));
assertThat(watches, hasSize(clusterAlertManagement ? ClusterAlertsUtil.WATCH_IDS.length : 0));
assertThat(bwc, hasSize(1));
// timeouts
@ -446,8 +490,8 @@ public class HttpExporterTests extends ESTestCase {
* @param settings The settings to select the exporter's settings from
* @return Never {@code null}.
*/
private static Config createConfig(Settings settings) {
return new Config("_http", HttpExporter.TYPE, settings.getAsSettings(exporterName()));
private Config createConfig(final Settings settings) {
return new Config("_http", HttpExporter.TYPE, settings, settings.getAsSettings(exporterName()), clusterService, licenseState);
}
private static String exporterName() {

View File

@ -73,6 +73,13 @@ public class MultiHttpResourceTests extends ESTestCase {
}
}
public void testGetResources() {
final List<MockHttpResource> allResources = successfulResources();
final MultiHttpResource multiResource = new MultiHttpResource(owner, allResources);
assertThat(multiResource.getResources(), equalTo(allResources));
}
private List<MockHttpResource> successfulResources() {
final int successful = randomIntBetween(2, 5);
final List<MockHttpResource> resources = new ArrayList<>(successful);

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Response;
@ -24,7 +25,6 @@ import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ -58,12 +58,12 @@ public class PublishableHttpResourceTests extends AbstractPublishableHttpResourc
final RestStatus failedStatus = failedCheckStatus();
final Response response = response("GET", endpoint, failedStatus);
when(client.performRequest("GET", endpoint, resource.getParameters())).thenReturn(response);
when(client.performRequest("GET", endpoint, getParameters(resource.getParameters()))).thenReturn(response);
sometimesAssertSimpleCheckForResource(client, logger, resourceBasePath, resourceName, resourceType, CheckResponse.ERROR, response);
verify(logger).trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, owner, ownerType);
verify(client).performRequest("GET", endpoint, resource.getParameters());
verify(client).performRequest("GET", endpoint, getParameters(resource.getParameters()));
verify(logger).error(any(org.apache.logging.log4j.util.Supplier.class), any(ResponseException.class));
verifyNoMoreInteractions(client, logger);
@ -76,12 +76,12 @@ public class PublishableHttpResourceTests extends AbstractPublishableHttpResourc
final Exception e = randomFrom(new IOException("expected"), new RuntimeException("expected"), responseException);
final Response response = e == responseException ? responseException.getResponse() : null;
when(client.performRequest("GET", endpoint, resource.getParameters())).thenThrow(e);
when(client.performRequest("GET", endpoint, getParameters(resource.getParameters()))).thenThrow(e);
sometimesAssertSimpleCheckForResource(client, logger, resourceBasePath, resourceName, resourceType, CheckResponse.ERROR, response);
verify(logger).trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, owner, ownerType);
verify(client).performRequest("GET", endpoint, resource.getParameters());
verify(client).performRequest("GET", endpoint, getParameters(resource.getParameters()));
verify(logger).error(any(org.apache.logging.log4j.util.Supplier.class), eq(e));
verifyNoMoreInteractions(client, logger);
@ -110,6 +110,34 @@ public class PublishableHttpResourceTests extends AbstractPublishableHttpResourc
verifyNoMoreInteractions(client, logger);
}
public void testDeleteResourceTrue() throws IOException {
final RestStatus status = randomFrom(successfulCheckStatus(), notFoundCheckStatus());
assertDeleteResource(status, true);
}
public void testDeleteResourceFalse() throws IOException {
assertDeleteResource(failedCheckStatus(), false);
}
public void testDeleteResourceErrors() throws IOException {
final String endpoint = concatenateEndpoint(resourceBasePath, resourceName);
final RestStatus failedStatus = failedCheckStatus();
final ResponseException responseException = responseException("DELETE", endpoint, failedStatus);
final Exception e = randomFrom(new IOException("expected"), new RuntimeException("expected"), responseException);
final Map<String, String> deleteParameters = deleteParameters(resource.getParameters());
when(client.performRequest("DELETE", endpoint, deleteParameters)).thenThrow(e);
assertThat(resource.deleteResource(client, logger, resourceBasePath, resourceName, resourceType, owner, ownerType), is(false));
verify(logger).trace("deleting {} [{}] from the [{}] {}", resourceType, resourceName, owner, ownerType);
verify(client).performRequest("DELETE", endpoint, deleteParameters);
verify(logger).error(any(org.apache.logging.log4j.util.Supplier.class), eq(e));
verifyNoMoreInteractions(client, logger);
}
public void testParameters() {
assertParameters(resource);
}
@ -138,19 +166,18 @@ public class PublishableHttpResourceTests extends AbstractPublishableHttpResourc
final String endpoint = concatenateEndpoint(resourceBasePath, resourceName);
final Response response = response("GET", endpoint, status);
when(client.performRequest("GET", endpoint, resource.getParameters())).thenReturn(response);
when(client.performRequest("GET", endpoint, getParameters(resource.getParameters()))).thenReturn(response);
sometimesAssertSimpleCheckForResource(client, logger, resourceBasePath, resourceName, resourceType, expected, response);
verify(logger).trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, owner, ownerType);
verify(client).performRequest("GET", endpoint, resource.getParameters());
verify(client).performRequest("GET", endpoint, getParameters(resource.getParameters()));
if (expected == CheckResponse.EXISTS) {
if (expected == CheckResponse.EXISTS || expected == CheckResponse.DOES_NOT_EXIST) {
verify(response).getStatusLine();
} else {
// 3 times because it also is used in the exception message
verify(response, times(3)).getStatusLine();
verify(response, times(2)).getRequestLine();
verify(response).getStatusLine();
verify(response).getRequestLine();
verify(response).getHost();
verify(response).getEntity();
}
@ -198,11 +225,40 @@ public class PublishableHttpResourceTests extends AbstractPublishableHttpResourc
is(expected));
} else {
final Tuple<CheckResponse, Response> responseTuple =
resource.checkForResource(client, logger, resourceBasePath, resourceName, resourceType, owner, ownerType);
resource.checkForResource(client, logger, resourceBasePath, resourceName, resourceType, owner, ownerType,
PublishableHttpResource.GET_EXISTS, PublishableHttpResource.GET_DOES_NOT_EXIST);
assertThat(responseTuple.v1(), is(expected));
assertThat(responseTuple.v2(), is(response));
}
}
private void assertDeleteResource(final RestStatus status, final boolean expected) throws IOException {
final String endpoint = concatenateEndpoint(resourceBasePath, resourceName);
final Response response = response("DELETE", endpoint, status);
final Map<String, String> deleteParameters = deleteParameters(resource.getParameters());
when(client.performRequest("DELETE", endpoint, deleteParameters)).thenReturn(response);
assertThat(resource.deleteResource(client, logger, resourceBasePath, resourceName, resourceType, owner, ownerType), is(expected));
verify(client).performRequest("DELETE", endpoint, deleteParameters);
verify(response).getStatusLine();
verify(logger).trace("deleting {} [{}] from the [{}] {}", resourceType, resourceName, owner, ownerType);
if (expected) {
verify(logger).debug("{} [{}] deleted from the [{}] {}", resourceType, resourceName, owner, ownerType);
} else {
ArgumentCaptor<RuntimeException> e = ArgumentCaptor.forClass(RuntimeException.class);
verify(logger).error(any(org.apache.logging.log4j.util.Supplier.class), e.capture());
assertThat(e.getValue().getMessage(),
is("[" + resourceBasePath + "/" + resourceName + "] responded with [" + status.getStatus() + "]"));
}
verifyNoMoreInteractions(client, response, logger, entity);
}
}

View File

@ -26,7 +26,7 @@ public class TemplateHttpResourceTests extends AbstractPublishableHttpResourceTe
private final TemplateHttpResource resource = new TemplateHttpResource(owner, masterTimeout, templateName, template);
public void testPipelineToHttpEntity() throws IOException {
public void testTemplateToHttpEntity() throws IOException {
final byte[] templateValueBytes = templateValue.getBytes(ContentType.APPLICATION_JSON.getCharset());
final HttpEntity entity = resource.templateToHttpEntity();

View File

@ -0,0 +1,192 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.exporter.http;
import java.util.Collections;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.CheckResponse;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.GET_EXISTS;
import static org.elasticsearch.xpack.monitoring.exporter.http.WatcherExistsHttpResource.XPACK_DOES_NOT_EXIST;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
/**
* Tests {@link WatcherExistsHttpResource}.
*/
public class WatcherExistsHttpResourceTests extends AbstractPublishableHttpResourceTestCase {
private final ClusterService clusterService = mock(ClusterService.class);
private final MultiHttpResource watches = mock(MultiHttpResource.class);
private final WatcherExistsHttpResource resource = new WatcherExistsHttpResource(owner, clusterService, watches);
private final Map<String, String> expectedParameters = getParameters(resource.getParameters(), GET_EXISTS, XPACK_DOES_NOT_EXIST);
public void testDoCheckIgnoresClientWhenNotElectedMaster() {
whenNotElectedMaster();
assertThat(resource.doCheck(client), is(CheckResponse.EXISTS));
verifyZeroInteractions(client);
}
public void testDoCheckExistsFor404() throws IOException {
whenElectedMaster();
// /_xpack returning a 404 means ES didn't handle the request properly and X-Pack doesn't exist
doCheckWithStatusCode(resource, "", "_xpack", notFoundCheckStatus(),
GET_EXISTS, XPACK_DOES_NOT_EXIST, CheckResponse.EXISTS);
}
public void testDoCheckExistsFor400() throws IOException {
whenElectedMaster();
// /_xpack returning a 400 means X-Pack does not exist
doCheckWithStatusCode(resource, "", "_xpack", RestStatus.BAD_REQUEST,
GET_EXISTS, XPACK_DOES_NOT_EXIST, CheckResponse.EXISTS);
}
public void testDoCheckExistsAsElectedMaster() throws IOException {
whenElectedMaster();
final String[] noWatcher = {
"{}",
"{\"features\":{\"watcher\":{\"available\":true,\"enabled\":false}}}",
"{\"features\":{\"watcher\":{\"available\":false,\"enabled\":true}}}",
"{\"features\":{\"watcher\":{\"available\":true}}}",
"{\"features\":{\"watcher\":{\"enabled\":true}}}"
};
final String endpoint = "/_xpack";
// success only implies that it responded; it also needs to be available and enabled
final Response response = response("GET", endpoint, successfulCheckStatus());
final HttpEntity responseEntity = new StringEntity(randomFrom(noWatcher), ContentType.APPLICATION_JSON);
when(response.getEntity()).thenReturn(responseEntity);
// returning EXISTS implies that we CANNOT use Watcher to avoid running the publish phase
doCheckWithStatusCode(resource, expectedParameters, endpoint, CheckResponse.EXISTS, response);
verify(response).getEntity();
}
public void testDoCheckDoesNotExist() throws IOException {
whenElectedMaster();
final String[] hasWatcher = {
"{\"features\":{\"watcher\":{\"available\":true,\"enabled\":true}}}",
"{\"features\":{\"watcher\":{\"enabled\":true,\"available\":true}}}"
};
final String endpoint = "/_xpack";
// success only implies that it responded; it also needs to be available and enabled
final Response response = response("GET", endpoint, successfulCheckStatus());
final HttpEntity responseEntity = new StringEntity(randomFrom(hasWatcher), ContentType.APPLICATION_JSON);
when(response.getEntity()).thenReturn(responseEntity);
// returning DOES_NOT_EXIST implies that we CAN use Watcher and need to run the publish phase
doCheckWithStatusCode(resource, expectedParameters, endpoint, CheckResponse.DOES_NOT_EXIST, response);
verify(response).getEntity();
}
public void testDoCheckErrorWithDataException() throws IOException {
whenElectedMaster();
final String[] errorWatcher = {
"{\"features\":{}}", // missing watcher object 'string'
"{\"watcher\":{\"enabled\":true,\"available\":true}}", // missing features outer object
"{{}" // extra {
};
final String endpoint = "/_xpack";
// success only implies that it responded; it also needs to be available and enabled
final Response response = response("GET", endpoint, successfulCheckStatus());
final HttpEntity responseEntity = new StringEntity(randomFrom(errorWatcher), ContentType.APPLICATION_JSON);
when(response.getEntity()).thenReturn(responseEntity);
// returning DOES_NOT_EXIST implies that we CAN use Watcher and need to run the publish phase
doCheckWithStatusCode(resource, endpoint, CheckResponse.ERROR, response);
}
public void testDoCheckErrorWithResponseException() throws IOException {
whenElectedMaster();
assertCheckWithException(resource, "", "_xpack");
}
public void testDoPublishTrue() throws IOException {
final CheckResponse checkResponse = randomFrom(CheckResponse.EXISTS, CheckResponse.DOES_NOT_EXIST);
final boolean publish = checkResponse == CheckResponse.DOES_NOT_EXIST;
final MockHttpResource mockWatch = new MockHttpResource(owner, randomBoolean(), checkResponse, publish);
final MultiHttpResource watches = new MultiHttpResource(owner, Collections.singletonList(mockWatch));
final WatcherExistsHttpResource resource = new WatcherExistsHttpResource(owner, clusterService, watches);
assertTrue(resource.doPublish(client));
assertThat(mockWatch.checked, is(1));
assertThat(mockWatch.published, is(publish ? 1 : 0));
}
public void testDoPublishFalse() throws IOException {
final CheckResponse checkResponse = randomFrom(CheckResponse.DOES_NOT_EXIST, CheckResponse.ERROR);
final MockHttpResource mockWatch = new MockHttpResource(owner, true, checkResponse, false);
final MultiHttpResource watches = new MultiHttpResource(owner, Collections.singletonList(mockWatch));
final WatcherExistsHttpResource resource = new WatcherExistsHttpResource(owner, clusterService, watches);
assertFalse(resource.doPublish(client));
assertThat(mockWatch.checked, is(1));
assertThat(mockWatch.published, is(checkResponse == CheckResponse.DOES_NOT_EXIST ? 1 : 0));
}
public void testParameters() {
final Map<String, String> parameters = resource.getParameters();
assertThat(parameters.get("filter_path"), is(WatcherExistsHttpResource.WATCHER_CHECK_PARAMETERS.get("filter_path")));
assertThat(parameters.size(), is(1));
}
public void testGetResources() {
assertThat(resource.getWatches(), sameInstance(watches));
}
private void whenElectedMaster() {
final ClusterState state = mock(ClusterState.class);
final DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(clusterService.state()).thenReturn(state);
when(state.nodes()).thenReturn(nodes);
when(nodes.isLocalNodeElectedMaster()).thenReturn(true);
}
private void whenNotElectedMaster() {
final ClusterState state = mock(ClusterState.class);
final DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(clusterService.state()).thenReturn(state);
when(state.nodes()).thenReturn(nodes);
when(nodes.isLocalNodeElectedMaster()).thenReturn(false);
}
}

View File

@ -27,15 +27,20 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.XPackClient;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.action.MonitoringBulkDoc;
import org.elasticsearch.xpack.monitoring.action.MonitoringBulkRequestBuilder;
import org.elasticsearch.xpack.monitoring.action.MonitoringIndex;
import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.transport.actions.get.GetWatchRequest;
import org.elasticsearch.xpack.watcher.transport.actions.get.GetWatchResponse;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@ -51,6 +56,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -65,7 +71,23 @@ import static org.hamcrest.Matchers.greaterThan;
public class LocalExporterTests extends MonitoringIntegTestCase {
private static SetOnce<String> indexTimeFormat = new SetOnce<>();
private SetOnce<String> indexTimeFormat = new SetOnce<>();
private static Boolean ENABLE_WATCHER;
@AfterClass
public static void cleanUpStatic() {
ENABLE_WATCHER = null;
}
@Override
protected boolean enableWatcher() {
if (ENABLE_WATCHER == null) {
ENABLE_WATCHER = randomBoolean();
}
return ENABLE_WATCHER;
}
@Override
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
@ -85,14 +107,10 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
.put("xpack.monitoring.exporters._local.enabled", false)
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
.put(XPackSettings.WATCHER_ENABLED.getKey(), enableWatcher())
.build();
}
@AfterClass
public static void cleanUp() {
indexTimeFormat = null;
}
@After
public void stopMonitoring() throws Exception {
// We start by disabling the monitoring service, so that no more collection are started
@ -130,7 +148,6 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
.putNull("xpack.monitoring.exporters._local.index.name.time_format")));
}
@TestLogging("org.elasticsearch.xpack.monitoring:TRACE")
public void testExport() throws Exception {
if (randomBoolean()) {
// indexing some random documents
@ -144,7 +161,7 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
if (randomBoolean()) {
// create some marvel indices to check if aliases are correctly created
final int oldies = randomIntBetween(1, 20);
final int oldies = randomIntBetween(1, 5);
for (int i = 0; i < oldies; i++) {
assertAcked(client().admin().indices().prepareCreate(".marvel-es-1-2014.12." + i)
.setSettings("number_of_shards", 1, "number_of_replicas", 0).get());
@ -251,6 +268,7 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
checkMonitoringPipeline();
checkMonitoringAliases();
checkMonitoringMappings();
checkMonitoringWatches();
checkMonitoringDocs();
}
@ -313,6 +331,23 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
}
}
/**
* Checks that the local exporter correctly creates Watches.
*/
private void checkMonitoringWatches() throws ExecutionException, InterruptedException {
if (enableWatcher()) {
final XPackClient xpackClient = new XPackClient(client());
final WatcherClient watcher = xpackClient.watcher();
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService(), watchId);
final GetWatchResponse response = watcher.getWatch(new GetWatchRequest(uniqueWatchId)).get();
assertTrue("watch [" + watchId + "] should exist", response.isFound());
}
}
}
/**
* Checks that the monitoring documents all have the cluster_uuid, timestamp and source_node
* fields and belongs to the right data or timestamped index.

View File

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.test;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.ScriptEngineService;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
/**
* A mock script engine that registers itself under the 'painless' name so that watches that use it can still be used in tests.
*/
public class MockPainlessScriptEngine extends MockScriptEngine {
public static final String NAME = "painless";
public static class TestPlugin extends MockScriptPlugin {
@Override
public ScriptEngineService getScriptEngineService(Settings settings) {
return new MockPainlessScriptEngine();
}
@Override
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
return Collections.emptyMap();
}
}
@Override
public String getType() {
return NAME;
}
@Override
public String getExtension() {
return NAME;
}
@Override
public Object compile(String name, String script, Map<String, String> params) {
// We always return the script's source as it is
return new MockCompiledScript(name, params, script, null);
}
@Override
public boolean isInlineScriptEnabled() {
return true;
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
@ -36,6 +37,7 @@ import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringService;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.client.MonitoringClient;
import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
@ -93,23 +95,25 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
*/
protected Boolean watcherEnabled;
private void randomizeSettings() {
if (watcherEnabled == null) {
watcherEnabled = enableWatcher();
}
}
@Override
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
if (securityEnabled == null) {
securityEnabled = enableSecurity();
}
if (watcherEnabled == null) {
watcherEnabled = enableWatcher();
}
logger.debug("--> security {}", securityEnabled ? "enabled" : "disabled");
logger.debug("--> watcher {}", watcherEnabled ? "enabled" : "disabled");
return super.buildTestCluster(scope, seed);
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
randomizeSettings();
Settings.Builder builder = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(XPackSettings.WATCHER_ENABLED.getKey(), watcherEnabled)
@ -126,6 +130,8 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
@Override
protected Settings transportClientSettings() {
randomizeSettings();
if (securityEnabled) {
return Settings.builder()
.put(super.transportClientSettings())
@ -133,10 +139,12 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
.put(Security.USER_SETTING.getKey(), "test:changeme")
.put(NetworkModule.TRANSPORT_TYPE_KEY, Security.NAME4)
.put(NetworkModule.HTTP_TYPE_KEY, Security.NAME4)
.put(XPackSettings.WATCHER_ENABLED.getKey(), watcherEnabled)
.build();
}
return Settings.builder().put(super.transportClientSettings())
.put("xpack.security.enabled", false)
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
.put(XPackSettings.WATCHER_ENABLED.getKey(), watcherEnabled)
.build();
}
@ -150,7 +158,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(XPackPlugin.class);
return Arrays.asList(XPackPlugin.class, MockPainlessScriptEngine.TestPlugin.class);
}
@Override
@ -281,6 +289,21 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
return templates;
}
protected List<Tuple<String, String>> monitoringWatches() {
final ClusterService clusterService = clusterService();
return Arrays.stream(ClusterAlertsUtil.WATCH_IDS)
.map(id -> new Tuple<>(ClusterAlertsUtil.createUniqueWatchId(clusterService, id),
ClusterAlertsUtil.loadWatch(clusterService, id)))
.collect(Collectors.toList());
}
protected List<String> monitoringWatchIds() {
return Arrays.stream(ClusterAlertsUtil.WATCH_IDS)
.map(id -> ClusterAlertsUtil.createUniqueWatchId(clusterService(), id))
.collect(Collectors.toList());
}
protected void assertTemplateInstalled(String name) {
boolean found = false;
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates().get().getIndexTemplates()) {
@ -442,6 +465,8 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
" 'cluster:admin/settings/update', 'cluster:admin/repository/delete', 'cluster:monitor/nodes/liveness'," +
" 'indices:admin/template/get', 'indices:admin/template/put', 'indices:admin/template/delete'," +
" 'cluster:admin/ingest/pipeline/get', 'cluster:admin/ingest/pipeline/put', 'cluster:admin/ingest/pipeline/delete'," +
" 'cluster:monitor/xpack/watcher/watch/get', 'cluster:monitor/xpack/watcher/watch/put', " +
" 'cluster:monitor/xpack/watcher/watch/delete'," +
" 'cluster:monitor/task', 'cluster:admin/xpack/monitoring/bulk' ]\n" +
" indices:\n" +
" - names: '*'\n" +

View File

@ -221,9 +221,17 @@ public class ReservedRolesStoreTests extends ESTestCase {
assertThat(remoteMonitoringAgentRole.cluster().check(ClusterStatsAction.NAME), is(true));
assertThat(remoteMonitoringAgentRole.cluster().check(PutIndexTemplateAction.NAME), is(true));
assertThat(remoteMonitoringAgentRole.cluster().check(ClusterRerouteAction.NAME), is(false));
assertThat(remoteMonitoringAgentRole.cluster().check(ClusterUpdateSettingsAction.NAME),
is(false));
assertThat(remoteMonitoringAgentRole.cluster().check(ClusterUpdateSettingsAction.NAME), is(false));
assertThat(remoteMonitoringAgentRole.cluster().check(MonitoringBulkAction.NAME), is(false));
assertThat(remoteMonitoringAgentRole.cluster().check(GetWatchAction.NAME), is(true));
assertThat(remoteMonitoringAgentRole.cluster().check(PutWatchAction.NAME), is(true));
assertThat(remoteMonitoringAgentRole.cluster().check(DeleteWatchAction.NAME), is(true));
assertThat(remoteMonitoringAgentRole.cluster().check(ExecuteWatchAction.NAME), is(false));
assertThat(remoteMonitoringAgentRole.cluster().check(AckWatchAction.NAME), is(false));
assertThat(remoteMonitoringAgentRole.cluster().check(ActivateWatchAction.NAME), is(false));
assertThat(remoteMonitoringAgentRole.cluster().check(WatcherServiceAction.NAME), is(false));
// we get this from the cluster:monitor privilege
assertThat(remoteMonitoringAgentRole.cluster().check(WatcherStatsAction.NAME), is(true));
assertThat(remoteMonitoringAgentRole.runAs().check(randomAlphaOfLengthBetween(1, 12)), is(false));

View File

@ -46,6 +46,7 @@
- do:
search:
index: .watches
body: { "query": { "term": { "_id": "my_watch" } } }
- match: { hits.total: 0 }
---

View File

@ -50,6 +50,7 @@ teardown:
- do:
search:
index: .watches
body: { "query": { "term": { "_id": "my_watch" } } }
- match: { hits.total: 1 }
- do: