diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index 76f93ae2030..1f689587c33 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -56,6 +56,7 @@ public class AlertManager extends AbstractComponent { private final ClusterService clusterService; private final ScriptService scriptService; private final Client client; + private final ConfigurationManager configurationManager; private final KeyedLock alertLock = new KeyedLock<>(); private final AtomicReference state = new AtomicReference<>(State.STOPPED); @@ -64,7 +65,8 @@ public class AlertManager extends AbstractComponent { @Inject public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore, IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager, - AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptService scriptService, Client client) { + AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptService scriptService, Client client, + ConfigurationManager configurationManager) { super(settings); this.scheduler = scheduler; this.threadPool = threadPool; @@ -75,10 +77,12 @@ public class AlertManager extends AbstractComponent { this.actionManager.setAlertManager(this); this.actionRegistry = actionRegistry; this.clusterService = clusterService; + this.scriptService = scriptService; this.client = client; + this.configurationManager = configurationManager; + clusterService.add(new AlertsClusterStateListener()); - manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true); // Close if the indices service is being stopped, so we don't run into search failures (locally) that will // happen because we're shutting down and an alert is scheduled. indicesService.addLifecycleListener(new LifecycleListener() { @@ -144,6 +148,11 @@ public class AlertManager extends AbstractComponent { } } + private void loadSettings() { + Settings indexedSettings = configurationManager.getGlobalConfig(); + manuallyStopped = !configurationManager.getOverriddenBooleanValue("alerts.start_immediately", indexedSettings, true); + } + public TriggerResult executeAlert(AlertActionEntry entry) throws IOException { ensureStarted(); alertLock.acquire(entry.getAlertName()); @@ -255,6 +264,14 @@ public class AlertManager extends AbstractComponent { private void internalStart(ClusterState initialState) { if (state.compareAndSet(State.STOPPED, State.STARTING)) { ClusterState clusterState = initialState; + + while(true) { + if (configurationManager.isReady(initialState)) { + loadSettings(); + break; + } + clusterState = newClusterState(clusterState); + } // Try to load alert store before the action manager, b/c action depends on alert store while (true) { if (alertsStore.start(clusterState)) { diff --git a/src/main/java/org/elasticsearch/alerts/AlertingModule.java b/src/main/java/org/elasticsearch/alerts/AlertingModule.java index 32bab823a51..a423ed43e46 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertingModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertingModule.java @@ -34,6 +34,7 @@ public class AlertingModule extends AbstractModule { bind(TriggerManager.class).asEagerSingleton(); bind(AlertScheduler.class).asEagerSingleton(); bind(AlertActionRegistry.class).asEagerSingleton(); + bind(ConfigurationManager.class).asEagerSingleton(); // Transport and client layer bind(TransportPutAlertAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 6ad97c51050..a89b264708c 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -64,23 +64,22 @@ public class AlertsStore extends AbstractComponent { private final TemplateHelper templateHelper; private final ConcurrentMap alertMap; private final AlertActionRegistry alertActionRegistry; + private final ConfigurationManager configurationManager; private final AtomicBoolean started = new AtomicBoolean(false); - private final int scrollSize; - private final TimeValue scrollTimeout; + private int scrollSize; + private TimeValue scrollTimeout; @Inject public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry, - TriggerManager triggerManager, TemplateHelper templateHelper) { + TriggerManager triggerManager, TemplateHelper templateHelper, ConfigurationManager configurationManager) { super(settings); this.client = client; this.alertActionRegistry = alertActionRegistry; this.templateHelper = templateHelper; this.alertMap = ConcurrentCollections.newConcurrentMap(); - // Not using component settings, to let AlertsStore and AlertActionManager share the same settings - this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); - this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); this.triggerManager = triggerManager; + this.configurationManager = configurationManager; } /** @@ -123,6 +122,13 @@ public class AlertsStore extends AbstractComponent { return true; } + private void loadSettings() { + // Not using component settings, to let AlertsStore and AlertActionManager share the same settings + Settings indexedSettings = configurationManager.getGlobalConfig(); + this.scrollTimeout = configurationManager.getOverriddenTimeValue("alerts.scroll.timeout", indexedSettings, TimeValue.timeValueSeconds(30)); + this.scrollSize = configurationManager.getOverriddenIntValue("alerts.scroll.size", indexedSettings, 100); + } + /** * Deletes the alert with the specified name if exists */ @@ -149,6 +155,12 @@ public class AlertsStore extends AbstractComponent { return true; } + if (configurationManager.isReady(state)) { + loadSettings(); + } else { + return false; + } + IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); if (alertIndexMetaData != null) { logger.debug("Previous alerting index"); diff --git a/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java b/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java new file mode 100644 index 00000000000..6ef4f2737a6 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.indices.IndexMissingException; + +import java.util.Map; + +/** + */ +public class ConfigurationManager extends AbstractComponent { + + private final Client client; + + public final String CONFIG_TYPE = "config"; + public final String CONFIG_INDEX = AlertsStore.ALERT_INDEX; + private final String GLOBAL_CONFIG_NAME = "global"; + private final Settings settings; + private volatile boolean readyToRead = false; + + @Inject + public ConfigurationManager(Settings settings, Client client) { + super(settings); + this.client = client; + this.settings = settings; + } + + /** + * This method gets the config for a component name + * @param componentName + * @return The immutable settings loaded from the index + */ + public Settings getConfigForComponent(String componentName) { + ensureReady(); + try { + client.admin().indices().prepareRefresh(CONFIG_INDEX).get(); + } catch (IndexMissingException ime) { + logger.info("No index [" + CONFIG_INDEX + "] found"); + return null; + } + GetResponse response = client.prepareGet(CONFIG_INDEX, CONFIG_TYPE, componentName).get(); + if (!response.isExists()) { + return null; + } + Map sourceMap = response.getSourceAsMap(); + ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder(); + for (Map.Entry configEntry : sourceMap.entrySet() ) { + settingsBuilder.put(configEntry.getKey(), configEntry.getValue()); + } + return settingsBuilder.build(); + } + + public Settings getGlobalConfig() { + return getConfigForComponent(GLOBAL_CONFIG_NAME); + } + + /** + * This method looks in the indexed settings provided for a setting and if it's not there it will go to the + * this.settings and load it from there using the default if not found + */ + public TimeValue getOverriddenTimeValue(String settingName, Settings indexedSettings, TimeValue defaultValue) { + if (indexedSettings == null || indexedSettings.get(settingName) == null) { + return settings.getAsTime(settingName, defaultValue); + } else { + return indexedSettings.getAsTime(settingName, defaultValue); + } + } + + public int getOverriddenIntValue(String settingName, Settings indexedSettings, int defaultValue) { + if (indexedSettings == null || indexedSettings.get(settingName) == null) { + return settings.getAsInt(settingName, defaultValue); + } else { + return indexedSettings.getAsInt(settingName, defaultValue); + } + } + + public boolean getOverriddenBooleanValue(String settingName, Settings indexedSettings, boolean defaultValue) { + if (indexedSettings == null || indexedSettings.get(settingName) == null) { + return settings.getAsBoolean(settingName, defaultValue); + } else { + return indexedSettings.getAsBoolean(settingName, defaultValue); + } + } + + + + /** + * This method determines if the config manager is ready to start loading configs by checking to make sure the + * config index is in a readable state. + * @param clusterState + * @return true if ready to read or false if not + */ + public boolean isReady(ClusterState clusterState) { + if (readyToRead) { + return true; + } else { + readyToRead = checkIndexState(clusterState); + return readyToRead; + } + } + + private void ensureReady() { + if (!readyToRead) { + throw new ElasticsearchException("Config index [" + CONFIG_INDEX + "] is not known to be started"); + } + } + + private boolean checkIndexState(ClusterState clusterState) { + IndexMetaData configIndexMetadata = clusterState.getMetaData().index(CONFIG_INDEX); + if (configIndexMetadata == null) { + logger.info("No previous [" + CONFIG_INDEX + "]"); + + return true; + } else { + if (clusterState.routingTable().index(CONFIG_INDEX).allPrimaryShardsActive()) { + logger.info("Index [" + CONFIG_INDEX + "] is started."); + + return true; + } else { + return false; + } + } + } +} diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 4ccf3b80add..2e80bdf2574 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -67,6 +67,7 @@ public class AlertActionManager extends AbstractComponent { public static final String ALERT_HISTORY_TYPE = "alerthistory"; private final Client client; + private final ConfigurationManager configurationManager; private AlertManager alertManager; private final ThreadPool threadPool; private final AlertsStore alertsStore; @@ -74,8 +75,8 @@ public class AlertActionManager extends AbstractComponent { private final TemplateHelper templateHelper; private final AlertActionRegistry actionRegistry; - private final int scrollSize; - private final TimeValue scrollTimeout; + private int scrollSize; + private TimeValue scrollTimeout; private final AtomicLong largestQueueSize = new AtomicLong(0); private final AtomicBoolean started = new AtomicBoolean(false); @@ -85,7 +86,7 @@ public class AlertActionManager extends AbstractComponent { @Inject public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry, ThreadPool threadPool, AlertsStore alertsStore, TriggerManager triggerManager, - TemplateHelper templateHelper) { + TemplateHelper templateHelper, ConfigurationManager configurationManager) { super(settings); this.client = client; this.actionRegistry = actionRegistry; @@ -93,9 +94,14 @@ public class AlertActionManager extends AbstractComponent { this.alertsStore = alertsStore; this.triggerManager = triggerManager; this.templateHelper = templateHelper; + this.configurationManager = configurationManager; + } + + private void loadSettings() { // Not using component settings, to let AlertsStore and AlertActionManager share the same settings - this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); - this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); + Settings indexedSettings = configurationManager.getGlobalConfig(); + this.scrollTimeout = configurationManager.getOverriddenTimeValue("alerts.scroll.timeout", indexedSettings, TimeValue.timeValueSeconds(30)); + this.scrollSize = configurationManager.getOverriddenIntValue("alerts.scroll.size", indexedSettings, 100); } public void setAlertManager(AlertManager alertManager){ @@ -106,6 +112,12 @@ public class AlertActionManager extends AbstractComponent { if (started.get()) { return true; } + if (configurationManager.isReady(state)) { + loadSettings(); + } else { + return false; + } + String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); if (indices.length == 0) { logger.info("No previous .alerthistory index, skip loading of alert actions"); diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java index f190767de53..e45af52bee6 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java @@ -7,6 +7,7 @@ package org.elasticsearch.alerts.actions; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.ConfigurationManager; import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.client.Client; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -24,11 +25,11 @@ public class AlertActionRegistry extends AbstractComponent { private volatile ImmutableOpenMap actionImplemented; @Inject - public AlertActionRegistry(Settings settings, Client client) { + public AlertActionRegistry(Settings settings, Client client, ConfigurationManager configurationManager) { super(settings); actionImplemented = ImmutableOpenMap.builder() - .fPut("email", new EmailAlertActionFactory()) - .fPut("index", new IndexAlertActionFactory(client)) + .fPut("email", new EmailAlertActionFactory(configurationManager)) + .fPut("index", new IndexAlertActionFactory(client, configurationManager)) .build(); } diff --git a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertActionFactory.java b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertActionFactory.java index 7eb0c9bee5b..c3225ffb8a7 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertActionFactory.java +++ b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertActionFactory.java @@ -9,7 +9,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.ConfigurationManager; import org.elasticsearch.alerts.triggers.TriggerResult; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -24,11 +26,19 @@ import java.util.Properties; public class EmailAlertActionFactory implements AlertActionFactory { - // TODO: Move to factory and make configurable - private final int port = 587; - private final String server = "smtp.gmail.com"; - private final String from = "esalertingtest@gmail.com"; - private final String passwd = "elasticsearchforthewin"; + private static final String GLOBAL_EMAIL_CONFIG = "email"; + + private static final String PORT_SETTING = "server.port"; + private static final String SERVER_SETTING = "server.name"; + private static final String FROM_SETTING = "from.address"; + private static final String PASSWD_SETTING = "from.passwd"; + + private final ConfigurationManager configurationManager; + + public EmailAlertActionFactory(ConfigurationManager configurationManager) { + this.configurationManager = configurationManager; + } + @Override @@ -72,21 +82,28 @@ public class EmailAlertActionFactory implements AlertActionFactory { throw new ElasticsearchIllegalStateException("Bad action [" + action.getClass() + "] passed to EmailAlertActionFactory expected [" + EmailAlertAction.class + "]"); } EmailAlertAction emailAlertAction = (EmailAlertAction)action; + final Settings emailSettings = configurationManager.getConfigForComponent(GLOBAL_EMAIL_CONFIG); Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.starttls.enable", "true"); - props.put("mail.smtp.host", server); - props.put("mail.smtp.port", port); - Session session = Session.getInstance(props, - new javax.mail.Authenticator() { - protected PasswordAuthentication getPasswordAuthentication() { - return new PasswordAuthentication(from, passwd); - } - }); + props.put("mail.smtp.host", emailSettings.get(SERVER_SETTING, "smtp.gmail.com")); + props.put("mail.smtp.port", emailSettings.getAsInt(PORT_SETTING, 587)); + Session session; + if (emailSettings.get(PASSWD_SETTING) != null) { + session = Session.getInstance(props, + new javax.mail.Authenticator() { + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(emailSettings.get(FROM_SETTING), emailSettings.get(PASSWD_SETTING)); + } + }); + } else { + session = Session.getDefaultInstance(props); + } + Message message = new MimeMessage(session); try { - message.setFrom(new InternetAddress(from)); + message.setFrom(new InternetAddress(emailSettings.get(FROM_SETTING))); message.setRecipients(Message.RecipientType.TO, emailAlertAction.getEmailAddresses().toArray(new Address[1])); message.setSubject("Elasticsearch Alert " + alert.getAlertName() + " triggered"); diff --git a/src/main/java/org/elasticsearch/alerts/actions/IndexAlertActionFactory.java b/src/main/java/org/elasticsearch/alerts/actions/IndexAlertActionFactory.java index 90d08270f3d..51a3dc285f8 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/IndexAlertActionFactory.java +++ b/src/main/java/org/elasticsearch/alerts/actions/IndexAlertActionFactory.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.ConfigurationManager; import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -23,9 +24,11 @@ import java.io.IOException; public class IndexAlertActionFactory implements AlertActionFactory { private final Client client; + private final ConfigurationManager configurationManager; - public IndexAlertActionFactory(Client client){ + public IndexAlertActionFactory(Client client, ConfigurationManager configurationManager){ this.client = client; + this.configurationManager = configurationManager; } @Override