diff --git a/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java
index 099dc4f7aaa..275ba38b7d5 100644
--- a/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java
+++ b/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java
@@ -195,6 +195,11 @@ public class XPackLicenseState {
listeners.add(Objects.requireNonNull(runnable));
}
+ /** Remove a listener */
+ public void removeListener(Runnable runnable) {
+ listeners.remove(runnable);
+ }
+
/** Return the current license type. */
public OperationMode getOperationMode() {
return status.mode;
@@ -326,12 +331,22 @@ public class XPackLicenseState {
/**
* Monitoring is always available as long as there is a valid license
*
- * @return true
+ * @return true if the license is active
*/
public boolean isMonitoringAllowed() {
return status.active;
}
+ /**
+ * Monitoring Cluster Alerts requires the equivalent license to use Watcher.
+ *
+ * @return {@link #isWatcherAllowed()}
+ * @see #isWatcherAllowed()
+ */
+ public boolean isMonitoringClusterAlertsAllowed() {
+ return isWatcherAllowed();
+ }
+
/**
* Determine if the current license allows the retention of indices to be modified.
*
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
index 532156b420b..e442a928745 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
@@ -114,8 +114,8 @@ public class Monitoring implements ActionPlugin {
final SSLService dynamicSSLService = sslService.createDynamicSSLService();
Map exporterFactories = new HashMap<>();
exporterFactories.put(HttpExporter.TYPE, config -> new HttpExporter(config, dynamicSSLService, threadPool.getThreadContext()));
- exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService, cleanerService));
- final Exporters exporters = new Exporters(settings, exporterFactories, clusterService, threadPool.getThreadContext());
+ exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, cleanerService));
+ final Exporters exporters = new Exporters(settings, exporterFactories, clusterService, licenseState, threadPool.getThreadContext());
Set collectors = new HashSet<>();
collectors.add(new IndicesStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/ClusterAlertsUtil.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/ClusterAlertsUtil.java
new file mode 100644
index 00000000000..d597c6e92f2
--- /dev/null
+++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/ClusterAlertsUtil.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring.exporter;
+
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.io.Streams;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * {@code ClusterAlertsUtil} provides static methods to easily load the JSON resources that
+ * represent watches for Cluster Alerts.
+ */
+public class ClusterAlertsUtil {
+
+ /**
+ * The name of the Watch resource when substituted by the high-level watch ID.
+ */
+ private static final String WATCH_FILE = "/monitoring/watches/%s.json";
+ /**
+ * Replace the ${monitoring.watch.cluster_uuid}
field in the watches.
+ */
+ private static final Pattern CLUSTER_UUID_PROPERTY =
+ Pattern.compile(Pattern.quote("${monitoring.watch.cluster_uuid}"));
+ /**
+ * Replace the ${monitoring.watch.id}
field in the watches.
+ */
+ private static final Pattern WATCH_ID_PROPERTY =
+ Pattern.compile(Pattern.quote("${monitoring.watch.id}"));
+ /**
+ * Replace the ${monitoring.watch.unique_id}
field in the watches.
+ *
+ * @see #createUniqueWatchId(ClusterService, String)
+ */
+ private static final Pattern UNIQUE_WATCH_ID_PROPERTY =
+ Pattern.compile(Pattern.quote("${monitoring.watch.unique_id}"));
+
+ /**
+ * An unsorted list of Watch IDs representing resource files for Monitoring Cluster Alerts.
+ */
+ public static final String[] WATCH_IDS = {
+ "elasticsearch_cluster_status",
+ "elasticsearch_version_mismatch",
+ "kibana_version_mismatch",
+ "logstash_version_mismatch"
+ };
+
+ /**
+ * Create a unique identifier for the watch and cluster.
+ *
+ * @param clusterService The cluster service used to fetch the latest cluster state.
+ * @param watchId The watch's ID.
+ * @return Never {@code null}.
+ * @see #WATCH_IDS
+ */
+ public static String createUniqueWatchId(final ClusterService clusterService, final String watchId) {
+ return createUniqueWatchId(clusterService.state().metaData().clusterUUID(), watchId);
+ }
+
+ /**
+ * Create a unique identifier for the watch and cluster.
+ *
+ * @param clusterUuid The cluster's UUID.
+ * @param watchId The watch's ID.
+ * @return Never {@code null}.
+ * @see #WATCH_IDS
+ */
+ private static String createUniqueWatchId(final String clusterUuid, final String watchId) {
+ return clusterUuid + "_" + watchId;
+ }
+
+ /**
+ * Create a unique watch ID and load the {@code watchId} resource by replacing variables,
+ * such as the cluster's UUID.
+ *
+ * @param clusterService The cluster service used to fetch the latest cluster state.
+ * @param watchId The watch's ID.
+ * @return Never {@code null}. The key is the unique watch ID. The value is the Watch source.
+ * @throws RuntimeException if the watch does not exist
+ */
+ public static String loadWatch(final ClusterService clusterService, final String watchId) {
+ final String resource = String.format(Locale.ROOT, WATCH_FILE, watchId);
+
+ try {
+ final String clusterUuid = clusterService.state().metaData().clusterUUID();
+ final String uniqueWatchId = createUniqueWatchId(clusterUuid, watchId);
+
+ // load the resource as-is
+ String source = loadResource(resource).utf8ToString();
+
+ source = CLUSTER_UUID_PROPERTY.matcher(source).replaceAll(clusterUuid);
+ source = WATCH_ID_PROPERTY.matcher(source).replaceAll(watchId);
+ source = UNIQUE_WATCH_ID_PROPERTY.matcher(source).replaceAll(uniqueWatchId);
+
+ return source;
+ } catch (final IOException e) {
+ throw new RuntimeException("Unable to load Watch [" + watchId + "]", e);
+ }
+ }
+
+ private static BytesReference loadResource(final String resource) throws IOException {
+ try (InputStream is = ClusterAlertsUtil.class.getResourceAsStream(resource)) {
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ Streams.copy(is, out);
+
+ return new BytesArray(out.toByteArray());
+ }
+ }
+ }
+
+}
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporter.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporter.java
index afe194279f4..b803b8ae2bd 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporter.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporter.java
@@ -5,9 +5,11 @@
*/
package org.elasticsearch.xpack.monitoring.exporter;
+import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import java.io.IOException;
@@ -27,6 +29,10 @@ public abstract class Exporter implements AutoCloseable {
* Note: disabling it obviously loses any benefit of using it, but it does allow clusters that don't run with ingest to not use it.
*/
public static final String USE_INGEST_PIPELINE_SETTING = "use_ingest";
+ /**
+ * Every {@code Exporter} allows users to explicitly disable cluster alerts.
+ */
+ public static final String CLUSTER_ALERTS_MANAGEMENT_SETTING = "cluster_alerts.management.enabled";
protected final Config config;
@@ -109,12 +115,19 @@ public abstract class Exporter implements AutoCloseable {
private final String name;
private final String type;
private final boolean enabled;
+ private final Settings globalSettings;
private final Settings settings;
+ private final ClusterService clusterService;
+ private final XPackLicenseState licenseState;
- public Config(String name, String type, Settings settings) {
+ public Config(String name, String type, Settings globalSettings, Settings settings,
+ ClusterService clusterService, XPackLicenseState licenseState) {
this.name = name;
this.type = type;
+ this.globalSettings = globalSettings;
this.settings = settings;
+ this.clusterService = clusterService;
+ this.licenseState = licenseState;
this.enabled = settings.getAsBoolean("enabled", true);
}
@@ -130,10 +143,22 @@ public abstract class Exporter implements AutoCloseable {
return enabled;
}
+ public Settings globalSettings() {
+ return globalSettings;
+ }
+
public Settings settings() {
return settings;
}
+ public ClusterService clusterService() {
+ return clusterService;
+ }
+
+ public XPackLicenseState licenseState() {
+ return licenseState;
+ }
+
}
/** A factory for constructing {@link Exporter} instances.*/
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java
index 7945f15e215..f66f7bbc8c1 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java
@@ -9,12 +9,14 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter;
@@ -35,15 +37,20 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable factories;
private final AtomicReference