Configuration : Add ability to configure alerting by creating .alerts/config/<name> documents in the index.

This commit adds the ConfigurationManager which the components use to load configuration from the index.
The configuration manager exposes an isReady method which components should not start until it is returns true.

Original commit: elastic/x-pack-elasticsearch@96a2f9f44f
This commit is contained in:
Brian Murphy 2014-12-01 12:15:32 +00:00
parent 6fd5e5202e
commit 8fa42a581f
8 changed files with 231 additions and 31 deletions

View File

@ -56,6 +56,7 @@ public class AlertManager extends AbstractComponent {
private final ClusterService clusterService;
private final ScriptService scriptService;
private final Client client;
private final ConfigurationManager configurationManager;
private final KeyedLock<String> alertLock = new KeyedLock<>();
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
@ -64,7 +65,8 @@ public class AlertManager extends AbstractComponent {
@Inject
public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore,
IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager,
AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptService scriptService, Client client) {
AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptService scriptService, Client client,
ConfigurationManager configurationManager) {
super(settings);
this.scheduler = scheduler;
this.threadPool = threadPool;
@ -75,10 +77,12 @@ public class AlertManager extends AbstractComponent {
this.actionManager.setAlertManager(this);
this.actionRegistry = actionRegistry;
this.clusterService = clusterService;
this.scriptService = scriptService;
this.client = client;
this.configurationManager = configurationManager;
clusterService.add(new AlertsClusterStateListener());
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an alert is scheduled.
indicesService.addLifecycleListener(new LifecycleListener() {
@ -144,6 +148,11 @@ public class AlertManager extends AbstractComponent {
}
}
private void loadSettings() {
Settings indexedSettings = configurationManager.getGlobalConfig();
manuallyStopped = !configurationManager.getOverriddenBooleanValue("alerts.start_immediately", indexedSettings, true);
}
public TriggerResult executeAlert(AlertActionEntry entry) throws IOException {
ensureStarted();
alertLock.acquire(entry.getAlertName());
@ -255,6 +264,14 @@ public class AlertManager extends AbstractComponent {
private void internalStart(ClusterState initialState) {
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
ClusterState clusterState = initialState;
while(true) {
if (configurationManager.isReady(initialState)) {
loadSettings();
break;
}
clusterState = newClusterState(clusterState);
}
// Try to load alert store before the action manager, b/c action depends on alert store
while (true) {
if (alertsStore.start(clusterState)) {

View File

@ -34,6 +34,7 @@ public class AlertingModule extends AbstractModule {
bind(TriggerManager.class).asEagerSingleton();
bind(AlertScheduler.class).asEagerSingleton();
bind(AlertActionRegistry.class).asEagerSingleton();
bind(ConfigurationManager.class).asEagerSingleton();
// Transport and client layer
bind(TransportPutAlertAction.class).asEagerSingleton();

View File

@ -64,23 +64,22 @@ public class AlertsStore extends AbstractComponent {
private final TemplateHelper templateHelper;
private final ConcurrentMap<String, Alert> alertMap;
private final AlertActionRegistry alertActionRegistry;
private final ConfigurationManager configurationManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final int scrollSize;
private final TimeValue scrollTimeout;
private int scrollSize;
private TimeValue scrollTimeout;
@Inject
public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry,
TriggerManager triggerManager, TemplateHelper templateHelper) {
TriggerManager triggerManager, TemplateHelper templateHelper, ConfigurationManager configurationManager) {
super(settings);
this.client = client;
this.alertActionRegistry = alertActionRegistry;
this.templateHelper = templateHelper;
this.alertMap = ConcurrentCollections.newConcurrentMap();
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
this.triggerManager = triggerManager;
this.configurationManager = configurationManager;
}
/**
@ -123,6 +122,13 @@ public class AlertsStore extends AbstractComponent {
return true;
}
private void loadSettings() {
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings
Settings indexedSettings = configurationManager.getGlobalConfig();
this.scrollTimeout = configurationManager.getOverriddenTimeValue("alerts.scroll.timeout", indexedSettings, TimeValue.timeValueSeconds(30));
this.scrollSize = configurationManager.getOverriddenIntValue("alerts.scroll.size", indexedSettings, 100);
}
/**
* Deletes the alert with the specified name if exists
*/
@ -149,6 +155,12 @@ public class AlertsStore extends AbstractComponent {
return true;
}
if (configurationManager.isReady(state)) {
loadSettings();
} else {
return false;
}
IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData != null) {
logger.debug("Previous alerting index");

View File

@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.IndexMissingException;
import java.util.Map;
/**
*/
public class ConfigurationManager extends AbstractComponent {
private final Client client;
public final String CONFIG_TYPE = "config";
public final String CONFIG_INDEX = AlertsStore.ALERT_INDEX;
private final String GLOBAL_CONFIG_NAME = "global";
private final Settings settings;
private volatile boolean readyToRead = false;
@Inject
public ConfigurationManager(Settings settings, Client client) {
super(settings);
this.client = client;
this.settings = settings;
}
/**
* This method gets the config for a component name
* @param componentName
* @return The immutable settings loaded from the index
*/
public Settings getConfigForComponent(String componentName) {
ensureReady();
try {
client.admin().indices().prepareRefresh(CONFIG_INDEX).get();
} catch (IndexMissingException ime) {
logger.info("No index [" + CONFIG_INDEX + "] found");
return null;
}
GetResponse response = client.prepareGet(CONFIG_INDEX, CONFIG_TYPE, componentName).get();
if (!response.isExists()) {
return null;
}
Map<String, Object> sourceMap = response.getSourceAsMap();
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder();
for (Map.Entry<String, Object> configEntry : sourceMap.entrySet() ) {
settingsBuilder.put(configEntry.getKey(), configEntry.getValue());
}
return settingsBuilder.build();
}
public Settings getGlobalConfig() {
return getConfigForComponent(GLOBAL_CONFIG_NAME);
}
/**
* This method looks in the indexed settings provided for a setting and if it's not there it will go to the
* this.settings and load it from there using the default if not found
*/
public TimeValue getOverriddenTimeValue(String settingName, Settings indexedSettings, TimeValue defaultValue) {
if (indexedSettings == null || indexedSettings.get(settingName) == null) {
return settings.getAsTime(settingName, defaultValue);
} else {
return indexedSettings.getAsTime(settingName, defaultValue);
}
}
public int getOverriddenIntValue(String settingName, Settings indexedSettings, int defaultValue) {
if (indexedSettings == null || indexedSettings.get(settingName) == null) {
return settings.getAsInt(settingName, defaultValue);
} else {
return indexedSettings.getAsInt(settingName, defaultValue);
}
}
public boolean getOverriddenBooleanValue(String settingName, Settings indexedSettings, boolean defaultValue) {
if (indexedSettings == null || indexedSettings.get(settingName) == null) {
return settings.getAsBoolean(settingName, defaultValue);
} else {
return indexedSettings.getAsBoolean(settingName, defaultValue);
}
}
/**
* This method determines if the config manager is ready to start loading configs by checking to make sure the
* config index is in a readable state.
* @param clusterState
* @return true if ready to read or false if not
*/
public boolean isReady(ClusterState clusterState) {
if (readyToRead) {
return true;
} else {
readyToRead = checkIndexState(clusterState);
return readyToRead;
}
}
private void ensureReady() {
if (!readyToRead) {
throw new ElasticsearchException("Config index [" + CONFIG_INDEX + "] is not known to be started");
}
}
private boolean checkIndexState(ClusterState clusterState) {
IndexMetaData configIndexMetadata = clusterState.getMetaData().index(CONFIG_INDEX);
if (configIndexMetadata == null) {
logger.info("No previous [" + CONFIG_INDEX + "]");
return true;
} else {
if (clusterState.routingTable().index(CONFIG_INDEX).allPrimaryShardsActive()) {
logger.info("Index [" + CONFIG_INDEX + "] is started.");
return true;
} else {
return false;
}
}
}
}

View File

@ -67,6 +67,7 @@ public class AlertActionManager extends AbstractComponent {
public static final String ALERT_HISTORY_TYPE = "alerthistory";
private final Client client;
private final ConfigurationManager configurationManager;
private AlertManager alertManager;
private final ThreadPool threadPool;
private final AlertsStore alertsStore;
@ -74,8 +75,8 @@ public class AlertActionManager extends AbstractComponent {
private final TemplateHelper templateHelper;
private final AlertActionRegistry actionRegistry;
private final int scrollSize;
private final TimeValue scrollTimeout;
private int scrollSize;
private TimeValue scrollTimeout;
private final AtomicLong largestQueueSize = new AtomicLong(0);
private final AtomicBoolean started = new AtomicBoolean(false);
@ -85,7 +86,7 @@ public class AlertActionManager extends AbstractComponent {
@Inject
public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry,
ThreadPool threadPool, AlertsStore alertsStore, TriggerManager triggerManager,
TemplateHelper templateHelper) {
TemplateHelper templateHelper, ConfigurationManager configurationManager) {
super(settings);
this.client = client;
this.actionRegistry = actionRegistry;
@ -93,9 +94,14 @@ public class AlertActionManager extends AbstractComponent {
this.alertsStore = alertsStore;
this.triggerManager = triggerManager;
this.templateHelper = templateHelper;
this.configurationManager = configurationManager;
}
private void loadSettings() {
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
Settings indexedSettings = configurationManager.getGlobalConfig();
this.scrollTimeout = configurationManager.getOverriddenTimeValue("alerts.scroll.timeout", indexedSettings, TimeValue.timeValueSeconds(30));
this.scrollSize = configurationManager.getOverriddenIntValue("alerts.scroll.size", indexedSettings, 100);
}
public void setAlertManager(AlertManager alertManager){
@ -106,6 +112,12 @@ public class AlertActionManager extends AbstractComponent {
if (started.get()) {
return true;
}
if (configurationManager.isReady(state)) {
loadSettings();
} else {
return false;
}
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
if (indices.length == 0) {
logger.info("No previous .alerthistory index, skip loading of alert actions");

View File

@ -7,6 +7,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ConfigurationManager;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -24,11 +25,11 @@ public class AlertActionRegistry extends AbstractComponent {
private volatile ImmutableOpenMap<String, AlertActionFactory> actionImplemented;
@Inject
public AlertActionRegistry(Settings settings, Client client) {
public AlertActionRegistry(Settings settings, Client client, ConfigurationManager configurationManager) {
super(settings);
actionImplemented = ImmutableOpenMap.<String, AlertActionFactory>builder()
.fPut("email", new EmailAlertActionFactory())
.fPut("index", new IndexAlertActionFactory(client))
.fPut("email", new EmailAlertActionFactory(configurationManager))
.fPut("index", new IndexAlertActionFactory(client, configurationManager))
.build();
}

View File

@ -9,7 +9,9 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ConfigurationManager;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -24,11 +26,19 @@ import java.util.Properties;
public class EmailAlertActionFactory implements AlertActionFactory {
// TODO: Move to factory and make configurable
private final int port = 587;
private final String server = "smtp.gmail.com";
private final String from = "esalertingtest@gmail.com";
private final String passwd = "elasticsearchforthewin";
private static final String GLOBAL_EMAIL_CONFIG = "email";
private static final String PORT_SETTING = "server.port";
private static final String SERVER_SETTING = "server.name";
private static final String FROM_SETTING = "from.address";
private static final String PASSWD_SETTING = "from.passwd";
private final ConfigurationManager configurationManager;
public EmailAlertActionFactory(ConfigurationManager configurationManager) {
this.configurationManager = configurationManager;
}
@Override
@ -72,21 +82,28 @@ public class EmailAlertActionFactory implements AlertActionFactory {
throw new ElasticsearchIllegalStateException("Bad action [" + action.getClass() + "] passed to EmailAlertActionFactory expected [" + EmailAlertAction.class + "]");
}
EmailAlertAction emailAlertAction = (EmailAlertAction)action;
final Settings emailSettings = configurationManager.getConfigForComponent(GLOBAL_EMAIL_CONFIG);
Properties props = new Properties();
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.smtp.host", server);
props.put("mail.smtp.port", port);
Session session = Session.getInstance(props,
new javax.mail.Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(from, passwd);
}
});
props.put("mail.smtp.host", emailSettings.get(SERVER_SETTING, "smtp.gmail.com"));
props.put("mail.smtp.port", emailSettings.getAsInt(PORT_SETTING, 587));
Session session;
if (emailSettings.get(PASSWD_SETTING) != null) {
session = Session.getInstance(props,
new javax.mail.Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(emailSettings.get(FROM_SETTING), emailSettings.get(PASSWD_SETTING));
}
});
} else {
session = Session.getDefaultInstance(props);
}
Message message = new MimeMessage(session);
try {
message.setFrom(new InternetAddress(from));
message.setFrom(new InternetAddress(emailSettings.get(FROM_SETTING)));
message.setRecipients(Message.RecipientType.TO,
emailAlertAction.getEmailAddresses().toArray(new Address[1]));
message.setSubject("Elasticsearch Alert " + alert.getAlertName() + " triggered");

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ConfigurationManager;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -23,9 +24,11 @@ import java.io.IOException;
public class IndexAlertActionFactory implements AlertActionFactory {
private final Client client;
private final ConfigurationManager configurationManager;
public IndexAlertActionFactory(Client client){
public IndexAlertActionFactory(Client client, ConfigurationManager configurationManager){
this.client = client;
this.configurationManager = configurationManager;
}
@Override