From 8fa42a581f0b2d1a9f7f7817c9984b9d02dffdd3 Mon Sep 17 00:00:00 2001 From: Brian Murphy Date: Mon, 1 Dec 2014 12:15:32 +0000 Subject: [PATCH] Configuration : Add ability to configure alerting by creating .alerts/config/ documents in the index. This commit adds the ConfigurationManager which the components use to load configuration from the index. The configuration manager exposes an isReady method which components should not start until it is returns true. Original commit: elastic/x-pack-elasticsearch@96a2f9f44f738e7e176ed4f586addc742637079d --- .../elasticsearch/alerts/AlertManager.java | 21 ++- .../elasticsearch/alerts/AlertingModule.java | 1 + .../org/elasticsearch/alerts/AlertsStore.java | 24 ++- .../alerts/ConfigurationManager.java | 137 ++++++++++++++++++ .../alerts/actions/AlertActionManager.java | 22 ++- .../alerts/actions/AlertActionRegistry.java | 7 +- .../actions/EmailAlertActionFactory.java | 45 ++++-- .../actions/IndexAlertActionFactory.java | 5 +- 8 files changed, 231 insertions(+), 31 deletions(-) create mode 100644 src/main/java/org/elasticsearch/alerts/ConfigurationManager.java diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index 76f93ae2030..1f689587c33 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -56,6 +56,7 @@ public class AlertManager extends AbstractComponent { private final ClusterService clusterService; private final ScriptService scriptService; private final Client client; + private final ConfigurationManager configurationManager; private final KeyedLock alertLock = new KeyedLock<>(); private final AtomicReference state = new AtomicReference<>(State.STOPPED); @@ -64,7 +65,8 @@ public class AlertManager extends AbstractComponent { @Inject public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore, IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager, - AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptService scriptService, Client client) { + AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptService scriptService, Client client, + ConfigurationManager configurationManager) { super(settings); this.scheduler = scheduler; this.threadPool = threadPool; @@ -75,10 +77,12 @@ public class AlertManager extends AbstractComponent { this.actionManager.setAlertManager(this); this.actionRegistry = actionRegistry; this.clusterService = clusterService; + this.scriptService = scriptService; this.client = client; + this.configurationManager = configurationManager; + clusterService.add(new AlertsClusterStateListener()); - manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true); // Close if the indices service is being stopped, so we don't run into search failures (locally) that will // happen because we're shutting down and an alert is scheduled. indicesService.addLifecycleListener(new LifecycleListener() { @@ -144,6 +148,11 @@ public class AlertManager extends AbstractComponent { } } + private void loadSettings() { + Settings indexedSettings = configurationManager.getGlobalConfig(); + manuallyStopped = !configurationManager.getOverriddenBooleanValue("alerts.start_immediately", indexedSettings, true); + } + public TriggerResult executeAlert(AlertActionEntry entry) throws IOException { ensureStarted(); alertLock.acquire(entry.getAlertName()); @@ -255,6 +264,14 @@ public class AlertManager extends AbstractComponent { private void internalStart(ClusterState initialState) { if (state.compareAndSet(State.STOPPED, State.STARTING)) { ClusterState clusterState = initialState; + + while(true) { + if (configurationManager.isReady(initialState)) { + loadSettings(); + break; + } + clusterState = newClusterState(clusterState); + } // Try to load alert store before the action manager, b/c action depends on alert store while (true) { if (alertsStore.start(clusterState)) { diff --git a/src/main/java/org/elasticsearch/alerts/AlertingModule.java b/src/main/java/org/elasticsearch/alerts/AlertingModule.java index 32bab823a51..a423ed43e46 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertingModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertingModule.java @@ -34,6 +34,7 @@ public class AlertingModule extends AbstractModule { bind(TriggerManager.class).asEagerSingleton(); bind(AlertScheduler.class).asEagerSingleton(); bind(AlertActionRegistry.class).asEagerSingleton(); + bind(ConfigurationManager.class).asEagerSingleton(); // Transport and client layer bind(TransportPutAlertAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 6ad97c51050..a89b264708c 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -64,23 +64,22 @@ public class AlertsStore extends AbstractComponent { private final TemplateHelper templateHelper; private final ConcurrentMap alertMap; private final AlertActionRegistry alertActionRegistry; + private final ConfigurationManager configurationManager; private final AtomicBoolean started = new AtomicBoolean(false); - private final int scrollSize; - private final TimeValue scrollTimeout; + private int scrollSize; + private TimeValue scrollTimeout; @Inject public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry, - TriggerManager triggerManager, TemplateHelper templateHelper) { + TriggerManager triggerManager, TemplateHelper templateHelper, ConfigurationManager configurationManager) { super(settings); this.client = client; this.alertActionRegistry = alertActionRegistry; this.templateHelper = templateHelper; this.alertMap = ConcurrentCollections.newConcurrentMap(); - // Not using component settings, to let AlertsStore and AlertActionManager share the same settings - this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); - this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); this.triggerManager = triggerManager; + this.configurationManager = configurationManager; } /** @@ -123,6 +122,13 @@ public class AlertsStore extends AbstractComponent { return true; } + private void loadSettings() { + // Not using component settings, to let AlertsStore and AlertActionManager share the same settings + Settings indexedSettings = configurationManager.getGlobalConfig(); + this.scrollTimeout = configurationManager.getOverriddenTimeValue("alerts.scroll.timeout", indexedSettings, TimeValue.timeValueSeconds(30)); + this.scrollSize = configurationManager.getOverriddenIntValue("alerts.scroll.size", indexedSettings, 100); + } + /** * Deletes the alert with the specified name if exists */ @@ -149,6 +155,12 @@ public class AlertsStore extends AbstractComponent { return true; } + if (configurationManager.isReady(state)) { + loadSettings(); + } else { + return false; + } + IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); if (alertIndexMetaData != null) { logger.debug("Previous alerting index"); diff --git a/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java b/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java new file mode 100644 index 00000000000..6ef4f2737a6 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.indices.IndexMissingException; + +import java.util.Map; + +/** + */ +public class ConfigurationManager extends AbstractComponent { + + private final Client client; + + public final String CONFIG_TYPE = "config"; + public final String CONFIG_INDEX = AlertsStore.ALERT_INDEX; + private final String GLOBAL_CONFIG_NAME = "global"; + private final Settings settings; + private volatile boolean readyToRead = false; + + @Inject + public ConfigurationManager(Settings settings, Client client) { + super(settings); + this.client = client; + this.settings = settings; + } + + /** + * This method gets the config for a component name + * @param componentName + * @return The immutable settings loaded from the index + */ + public Settings getConfigForComponent(String componentName) { + ensureReady(); + try { + client.admin().indices().prepareRefresh(CONFIG_INDEX).get(); + } catch (IndexMissingException ime) { + logger.info("No index [" + CONFIG_INDEX + "] found"); + return null; + } + GetResponse response = client.prepareGet(CONFIG_INDEX, CONFIG_TYPE, componentName).get(); + if (!response.isExists()) { + return null; + } + Map sourceMap = response.getSourceAsMap(); + ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder(); + for (Map.Entry configEntry : sourceMap.entrySet() ) { + settingsBuilder.put(configEntry.getKey(), configEntry.getValue()); + } + return settingsBuilder.build(); + } + + public Settings getGlobalConfig() { + return getConfigForComponent(GLOBAL_CONFIG_NAME); + } + + /** + * This method looks in the indexed settings provided for a setting and if it's not there it will go to the + * this.settings and load it from there using the default if not found + */ + public TimeValue getOverriddenTimeValue(String settingName, Settings indexedSettings, TimeValue defaultValue) { + if (indexedSettings == null || indexedSettings.get(settingName) == null) { + return settings.getAsTime(settingName, defaultValue); + } else { + return indexedSettings.getAsTime(settingName, defaultValue); + } + } + + public int getOverriddenIntValue(String settingName, Settings indexedSettings, int defaultValue) { + if (indexedSettings == null || indexedSettings.get(settingName) == null) { + return settings.getAsInt(settingName, defaultValue); + } else { + return indexedSettings.getAsInt(settingName, defaultValue); + } + } + + public boolean getOverriddenBooleanValue(String settingName, Settings indexedSettings, boolean defaultValue) { + if (indexedSettings == null || indexedSettings.get(settingName) == null) { + return settings.getAsBoolean(settingName, defaultValue); + } else { + return indexedSettings.getAsBoolean(settingName, defaultValue); + } + } + + + + /** + * This method determines if the config manager is ready to start loading configs by checking to make sure the + * config index is in a readable state. + * @param clusterState + * @return true if ready to read or false if not + */ + public boolean isReady(ClusterState clusterState) { + if (readyToRead) { + return true; + } else { + readyToRead = checkIndexState(clusterState); + return readyToRead; + } + } + + private void ensureReady() { + if (!readyToRead) { + throw new ElasticsearchException("Config index [" + CONFIG_INDEX + "] is not known to be started"); + } + } + + private boolean checkIndexState(ClusterState clusterState) { + IndexMetaData configIndexMetadata = clusterState.getMetaData().index(CONFIG_INDEX); + if (configIndexMetadata == null) { + logger.info("No previous [" + CONFIG_INDEX + "]"); + + return true; + } else { + if (clusterState.routingTable().index(CONFIG_INDEX).allPrimaryShardsActive()) { + logger.info("Index [" + CONFIG_INDEX + "] is started."); + + return true; + } else { + return false; + } + } + } +} diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 4ccf3b80add..2e80bdf2574 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -67,6 +67,7 @@ public class AlertActionManager extends AbstractComponent { public static final String ALERT_HISTORY_TYPE = "alerthistory"; private final Client client; + private final ConfigurationManager configurationManager; private AlertManager alertManager; private final ThreadPool threadPool; private final AlertsStore alertsStore; @@ -74,8 +75,8 @@ public class AlertActionManager extends AbstractComponent { private final TemplateHelper templateHelper; private final AlertActionRegistry actionRegistry; - private final int scrollSize; - private final TimeValue scrollTimeout; + private int scrollSize; + private TimeValue scrollTimeout; private final AtomicLong largestQueueSize = new AtomicLong(0); private final AtomicBoolean started = new AtomicBoolean(false); @@ -85,7 +86,7 @@ public class AlertActionManager extends AbstractComponent { @Inject public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry, ThreadPool threadPool, AlertsStore alertsStore, TriggerManager triggerManager, - TemplateHelper templateHelper) { + TemplateHelper templateHelper, ConfigurationManager configurationManager) { super(settings); this.client = client; this.actionRegistry = actionRegistry; @@ -93,9 +94,14 @@ public class AlertActionManager extends AbstractComponent { this.alertsStore = alertsStore; this.triggerManager = triggerManager; this.templateHelper = templateHelper; + this.configurationManager = configurationManager; + } + + private void loadSettings() { // Not using component settings, to let AlertsStore and AlertActionManager share the same settings - this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); - this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); + Settings indexedSettings = configurationManager.getGlobalConfig(); + this.scrollTimeout = configurationManager.getOverriddenTimeValue("alerts.scroll.timeout", indexedSettings, TimeValue.timeValueSeconds(30)); + this.scrollSize = configurationManager.getOverriddenIntValue("alerts.scroll.size", indexedSettings, 100); } public void setAlertManager(AlertManager alertManager){ @@ -106,6 +112,12 @@ public class AlertActionManager extends AbstractComponent { if (started.get()) { return true; } + if (configurationManager.isReady(state)) { + loadSettings(); + } else { + return false; + } + String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); if (indices.length == 0) { logger.info("No previous .alerthistory index, skip loading of alert actions"); diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java index f190767de53..e45af52bee6 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java @@ -7,6 +7,7 @@ package org.elasticsearch.alerts.actions; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.ConfigurationManager; import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.client.Client; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -24,11 +25,11 @@ public class AlertActionRegistry extends AbstractComponent { private volatile ImmutableOpenMap actionImplemented; @Inject - public AlertActionRegistry(Settings settings, Client client) { + public AlertActionRegistry(Settings settings, Client client, ConfigurationManager configurationManager) { super(settings); actionImplemented = ImmutableOpenMap.builder() - .fPut("email", new EmailAlertActionFactory()) - .fPut("index", new IndexAlertActionFactory(client)) + .fPut("email", new EmailAlertActionFactory(configurationManager)) + .fPut("index", new IndexAlertActionFactory(client, configurationManager)) .build(); } diff --git a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertActionFactory.java b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertActionFactory.java index 7eb0c9bee5b..c3225ffb8a7 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertActionFactory.java +++ b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertActionFactory.java @@ -9,7 +9,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.ConfigurationManager; import org.elasticsearch.alerts.triggers.TriggerResult; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -24,11 +26,19 @@ import java.util.Properties; public class EmailAlertActionFactory implements AlertActionFactory { - // TODO: Move to factory and make configurable - private final int port = 587; - private final String server = "smtp.gmail.com"; - private final String from = "esalertingtest@gmail.com"; - private final String passwd = "elasticsearchforthewin"; + private static final String GLOBAL_EMAIL_CONFIG = "email"; + + private static final String PORT_SETTING = "server.port"; + private static final String SERVER_SETTING = "server.name"; + private static final String FROM_SETTING = "from.address"; + private static final String PASSWD_SETTING = "from.passwd"; + + private final ConfigurationManager configurationManager; + + public EmailAlertActionFactory(ConfigurationManager configurationManager) { + this.configurationManager = configurationManager; + } + @Override @@ -72,21 +82,28 @@ public class EmailAlertActionFactory implements AlertActionFactory { throw new ElasticsearchIllegalStateException("Bad action [" + action.getClass() + "] passed to EmailAlertActionFactory expected [" + EmailAlertAction.class + "]"); } EmailAlertAction emailAlertAction = (EmailAlertAction)action; + final Settings emailSettings = configurationManager.getConfigForComponent(GLOBAL_EMAIL_CONFIG); Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.starttls.enable", "true"); - props.put("mail.smtp.host", server); - props.put("mail.smtp.port", port); - Session session = Session.getInstance(props, - new javax.mail.Authenticator() { - protected PasswordAuthentication getPasswordAuthentication() { - return new PasswordAuthentication(from, passwd); - } - }); + props.put("mail.smtp.host", emailSettings.get(SERVER_SETTING, "smtp.gmail.com")); + props.put("mail.smtp.port", emailSettings.getAsInt(PORT_SETTING, 587)); + Session session; + if (emailSettings.get(PASSWD_SETTING) != null) { + session = Session.getInstance(props, + new javax.mail.Authenticator() { + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(emailSettings.get(FROM_SETTING), emailSettings.get(PASSWD_SETTING)); + } + }); + } else { + session = Session.getDefaultInstance(props); + } + Message message = new MimeMessage(session); try { - message.setFrom(new InternetAddress(from)); + message.setFrom(new InternetAddress(emailSettings.get(FROM_SETTING))); message.setRecipients(Message.RecipientType.TO, emailAlertAction.getEmailAddresses().toArray(new Address[1])); message.setSubject("Elasticsearch Alert " + alert.getAlertName() + " triggered"); diff --git a/src/main/java/org/elasticsearch/alerts/actions/IndexAlertActionFactory.java b/src/main/java/org/elasticsearch/alerts/actions/IndexAlertActionFactory.java index 90d08270f3d..51a3dc285f8 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/IndexAlertActionFactory.java +++ b/src/main/java/org/elasticsearch/alerts/actions/IndexAlertActionFactory.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.ConfigurationManager; import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -23,9 +24,11 @@ import java.io.IOException; public class IndexAlertActionFactory implements AlertActionFactory { private final Client client; + private final ConfigurationManager configurationManager; - public IndexAlertActionFactory(Client client){ + public IndexAlertActionFactory(Client client, ConfigurationManager configurationManager){ this.client = client; + this.configurationManager = configurationManager; } @Override