Introduced AlertsStore that is responsible for maintaining / storing / parsing etc of alerts

Original commit: elastic/x-pack-elasticsearch@40aae7dc30
This commit is contained in:
Martijn van Groningen 2014-10-28 18:49:23 +01:00
parent 34c359281a
commit 6b2fbe400e
8 changed files with 413 additions and 477 deletions

View File

@ -15,7 +15,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
public class Alert implements ToXContent{
public class Alert implements ToXContent {
private final String alertName;
private String queryName;
private AlertTrigger trigger;
@ -162,18 +163,18 @@ public class Alert implements ToXContent{
//Note we deliberately don't serialize the version here
builder.startObject();
builder.field(AlertManager.QUERY_FIELD.getPreferredName(), queryName);
builder.field(AlertManager.SCHEDULE_FIELD.getPreferredName(), schedule);
builder.field(AlertManager.TIMEPERIOD_FIELD.getPreferredName(), timePeriod);
builder.field(AlertManager.LASTRAN_FIELD.getPreferredName(), lastRan);
builder.field(AlertManager.CURRENTLY_RUNNING.getPreferredName(), running);
builder.field(AlertManager.ENABLED.getPreferredName(), enabled);
builder.field(AlertManager.SIMPLE_QUERY.getPreferredName(), simpleQuery);
builder.field(AlertManager.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
builder.field(AlertsStore.QUERY_FIELD.getPreferredName(), queryName);
builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule);
builder.field(AlertsStore.TIMEPERIOD_FIELD.getPreferredName(), timePeriod);
builder.field(AlertsStore.LASTRAN_FIELD.getPreferredName(), lastRan);
builder.field(AlertsStore.CURRENTLY_RUNNING.getPreferredName(), running);
builder.field(AlertsStore.ENABLED.getPreferredName(), enabled);
builder.field(AlertsStore.SIMPLE_QUERY.getPreferredName(), simpleQuery);
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
builder.field(AlertManager.TRIGGER_FIELD.getPreferredName());
builder.field(AlertsStore.TRIGGER_FIELD.getPreferredName());
trigger.toXContent(builder, params);
builder.field(AlertManager.ACTION_FIELD.getPreferredName());
builder.field(AlertsStore.ACTION_FIELD.getPreferredName());
builder.startObject();
for (AlertAction action : actions){
@ -183,7 +184,7 @@ public class Alert implements ToXContent{
builder.endObject();
if (indices != null && !indices.isEmpty()) {
builder.field(AlertManager.INDICES.getPreferredName());
builder.field(AlertsStore.INDICES.getPreferredName());
builder.startArray();
for (String index : indices){
builder.value(index);

View File

@ -7,150 +7,67 @@ package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
public class AlertManager extends AbstractLifecycleComponent {
// TODO: Add lock synchronization via KeyedLock so that we can lock concurrent operations for the same alert.
// For example 2 concurrent deletes for the same alert, so that at least one fails, but not that 2 deletes are half done.
// The KeyedLock make sure that we only lock on the same alert, but not on different alerts.
public class AlertManager extends AbstractComponent {
public static final String ALERT_INDEX = ".alerts";
public static final String ALERT_TYPE = "alert";
public static final ParseField QUERY_FIELD = new ParseField("query");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod");
public static final ParseField ACTION_FIELD = new ParseField("action");
public static final ParseField LASTRAN_FIELD = new ParseField("lastRan");
public static final ParseField INDICES = new ParseField("indices");
public static final ParseField CURRENTLY_RUNNING = new ParseField("running");
public static final ParseField ENABLED = new ParseField("enabled");
public static final ParseField SIMPLE_QUERY = new ParseField("simple");
public static final ParseField TIMESTAMP_FIELD = new ParseField("timefield");
public static final ParseField LAST_ACTION_FIRE = new ParseField("lastactionfire");
private final Client client;
private AlertScheduler scheduler;
private final ThreadPool threadPool;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean startActions = new AtomicBoolean(false);
private final ConcurrentMap<String,Alert> alertMap;
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean startActions = new AtomicBoolean(false);
private AlertActionRegistry actionRegistry;
private AlertActionManager actionManager;
final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config
private void sendAlertsToScheduler() {
for (Map.Entry<String, Alert> entry : alertMap.entrySet()) {
scheduler.addAlert(entry.getKey(), entry.getValue());
}
}
@Override
protected void doStart() throws ElasticsearchException {
logger.warn("STARTING");
}
@Override
protected void doStop() throws ElasticsearchException {
logger.warn("STOPPING");
}
@Override
protected void doClose() throws ElasticsearchException {
logger.warn("CLOSING");
}
private AlertScheduler scheduler;
private final AlertsStore alertsStore;
private final AlertActionRegistry actionRegistry;
private final AlertActionManager actionManager;
@Inject
public AlertManager(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
AlertActionRegistry actionRegistry) {
AlertActionRegistry actionRegistry, AlertsStore alertsStore) {
super(settings);
logger.warn("Initing AlertManager");
this.client = client;
alertMap = ConcurrentCollections.newConcurrentMap();
clusterService.add(new AlertsClusterStateListener());
this.threadPool = threadPool;
this.actionRegistry = actionRegistry;
this.actionManager = new AlertActionManager(client, this, actionRegistry, threadPool);
this.alertsStore = alertsStore;
clusterService.add(new AlertsClusterStateListener());
}
public void setAlertScheduler(AlertScheduler scheduler){
this.scheduler = scheduler;
}
private ClusterHealthStatus createAlertsIndex() {
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
ClusterHealthResponse actionGet = client.admin().cluster()
.health(Requests.clusterHealthRequest(ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
return actionGet.getStatus();
}
public DateTime timeActionLastTriggered(String alertName) {
Alert indexedAlert;
indexedAlert = getAlertFromIndex(alertName);
if (indexedAlert == null) {
return null;
} else {
return indexedAlert.lastActionFire();
}
}
public void doAction(Alert alert, AlertActionEntry result, DateTime scheduledTime) {
ensureStarted();
logger.warn("We have triggered");
DateTime lastActionFire = timeActionLastTriggered(alert.alertName());
long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis();
@ -164,352 +81,72 @@ public class AlertManager extends AbstractLifecycleComponent {
logger.warn("Did action !");
alert.lastActionFire(scheduledTime);
persistAlert(alert.alertName(), alert, IndexRequest.OpType.INDEX);
alertsStore.updateAlert(alert);
}
}
public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) {
Alert indexedAlert;
ensureStarted();
Alert alert;
try {
indexedAlert = getAlertFromIndex(alertName);
Alert inMemoryAlert = alertMap.get(alertName);
if (indexedAlert == null) {
//Alert has been deleted out from underneath us
alertMap.remove(alertName);
return false;
} else if (inMemoryAlert == null) {
logger.warn("Got claim attempt for alert [{}] that alert manager does not have but is in the index.", alertName);
alertMap.put(alertName, indexedAlert); //This is an odd state to get into
} else {
if (!inMemoryAlert.isSameAlert(indexedAlert)) {
alertMap.put(alertName, indexedAlert); //Probably has been changed by another process and we missed the notification
}
}
if (!indexedAlert.enabled()) {
return false;
}
if (indexedAlert.running().equals(scheduleRunTime) || indexedAlert.running().isAfter(scheduleRunTime)) {
//Someone else is already running this alert or this alert time has passed
alert = alertsStore.getAlert(alertName);
if (!alert.enabled()) {
return false;
}
} catch (Throwable t) {
throw new ElasticsearchException("Unable to load alert from index",t);
}
indexedAlert.running(scheduleRunTime);
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(ALERT_INDEX);
updateRequest.type(ALERT_TYPE);
updateRequest.id(alertName);
updateRequest.version(indexedAlert.version());//Since we loaded this alert directly from the index the version should be correct
XContentBuilder alertBuilder;
try {
alertBuilder = XContentFactory.jsonBuilder();
indexedAlert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS);
} catch (IOException ie) {
throw new ElasticsearchException("Unable to serialize alert ["+ alertName + "]", ie);
}
updateRequest.doc(alertBuilder);
updateRequest.retryOnConflict(0);
try {
client.update(updateRequest).actionGet();
} catch (ElasticsearchException ee) {
logger.error("Failed to update in claim", ee);
return false;
}
Alert alert = alertMap.get(alertName);
if (alert != null) {
alert.running(scheduleRunTime);
}
alert.running(scheduleRunTime);
alertsStore.updateAlert(alert);
return true;
}
private Alert getAlertFromIndex(String alertName) {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
GetRequest getRequest = Requests.getRequest(ALERT_INDEX);
getRequest.type(ALERT_TYPE);
getRequest.id(alertName);
GetResponse getResponse = client.get(getRequest).actionGet();
if (getResponse.isExists()) {
return parseAlert(alertName, getResponse.getSourceAsMap(), getResponse.getVersion());
} else {
throw new ElasticsearchException("Unable to find [" + alertName + "] in the [" +ALERT_INDEX + "]" );
}
}
public void refreshAlerts() {
public void clearAndReload() {
ensureStarted();
try {
synchronized (alertMap) {
scheduler.clearAlerts();
alertMap.clear();
loadAlerts();
sendAlertsToScheduler();
}
scheduler.clearAlerts();
alertsStore.reload();
sendAlertsToScheduler();
} catch (Exception e){
throw new ElasticsearchException("Failed to refresh alerts",e);
}
}
private void loadAlerts() {
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex();
}
SearchResponse searchResponse = client.prepareSearch().setSource(
"{ \"query\" : " +
"{ \"match_all\" : {}}," +
"\"size\" : \"100\"" +
"}"
).setTypes(ALERT_TYPE).setIndices(ALERT_INDEX).execute().actionGet();
for (SearchHit sh : searchResponse.getHits()) {
String alertId = sh.getId();
try {
Alert alert = parseAlert(alertId, sh);
alertMap.put(alertId, alert);
} catch (ElasticsearchException e) {
logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh);
}
}
logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size());
}
public long getLastEventCount(String alertName){
return 0;
}
public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime) throws Exception {
try {
Alert alert = getAlertForName(alertName);
alert.lastRan(fireTime);
XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint();
alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS);
logger.error(XContentHelper.convertToJson(alertBuilder.bytes(),false,true));
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.id(alertName);
updateRequest.index(ALERT_INDEX);
updateRequest.type(ALERT_TYPE);
ensureStarted();
Alert alert = alertsStore.getAlert(alertName);
alert.lastRan(fireTime);
alertsStore.updateAlert(alert);
return true;
}
updateRequest.doc(alertBuilder);
updateRequest.refresh(true);
client.update(updateRequest).actionGet();
public boolean deleteAlert(String name) throws InterruptedException, ExecutionException {
ensureStarted();
if (alertsStore.hasAlert(name)) {
assert scheduler.deleteAlertFromSchedule(name);
alertsStore.deleteAlert(name);
return true;
} catch (Throwable t) {
logger.error("Failed to update alert [{}] with lastRan of [{}]",t, alertName, fireTime);
} else {
return false;
}
}
public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
if (alertMap.remove(alertName) != null) {
scheduler.deleteAlertFromSchedule(alertName);
try {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.id(alertName);
deleteRequest.index(ALERT_INDEX);
deleteRequest.type(ALERT_TYPE);
deleteRequest.operationThreaded(false);
deleteRequest.refresh(true);
if (client.delete(deleteRequest).actionGet().isFound()) {
return true;
} else {
logger.warn("Couldn't find [{}] in the index triggering a full refresh", alertName);
//Something went wrong refresh
refreshAlerts();
return false;
}
}
catch (Exception e){
logger.warn("Something went wrong when deleting [{}] from the index triggering a full refresh", e, alertName);
//Something went wrong refresh
refreshAlerts();
throw e;
}
}
return false;
}
public boolean addAlert(String alertName, Alert alert, boolean persist) {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex();
}
if (alertMap.putIfAbsent(alertName, alert) == null) {
scheduler.addAlert(alertName, alert);
if (persist) {
return persistAlert(alertName, alert, IndexRequest.OpType.CREATE);
} else {
return true;
}
} else {
throw new ElasticsearchIllegalArgumentException("There is already an alert named ["+alertName+"]");
}
}
public boolean disableAlert(String alertName) {
Alert alert = alertMap.get(alertName);
if (alert == null) {
throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]");
}
alert.enabled(false);
return persistAlert(alertName, alert, IndexRequest.OpType.INDEX);
}
public boolean enableAlert(String alertName) {
Alert alert = alertMap.get(alertName);
if (alert == null) {
throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]");
}
alert.enabled(true);
return persistAlert(alertName, alert, IndexRequest.OpType.INDEX);
}
private boolean persistAlert(String alertName, Alert alert, IndexRequest.OpType opType) {
XContentBuilder builder;
try {
builder = XContentFactory.jsonBuilder();
alert.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.source(builder);
indexRequest.opType(opType);
IndexResponse indexResponse = client.index(indexRequest).actionGet();
//@TODO : broadcast refresh here
if (opType.equals(IndexRequest.OpType.CREATE)) {
return indexResponse.isCreated();
} else {
return true;
}
} catch (IOException ie) {
throw new ElasticsearchIllegalStateException("Unable to convert alert to JSON", ie);
}
}
private Alert parseAlert(String alertId, SearchHit sh) {
Map<String, Object> fields = sh.sourceAsMap();
return parseAlert(alertId, fields, sh.getVersion());
}
public Alert parseAlert(String alertId, Map<String, Object> fields) {
return parseAlert(alertId, fields, -1);
}
public Alert parseAlert(String alertId, Map<String, Object> fields, long version ) {
logger.warn("Parsing : [{}]", alertId);
String query = fields.get(QUERY_FIELD.getPreferredName()).toString();
String schedule = fields.get(SCHEDULE_FIELD.getPreferredName()).toString();
Object triggerObj = fields.get(TRIGGER_FIELD.getPreferredName());
AlertTrigger trigger = null;
if (triggerObj instanceof Map) {
Map<String, Object> triggerMap = (Map<String, Object>) triggerObj;
trigger = TriggerManager.parseTriggerFromMap(triggerMap);
} else {
throw new ElasticsearchException("Unable to parse trigger [" + triggerObj + "]");
}
String timeString = fields.get(TIMEPERIOD_FIELD.getPreferredName()).toString();
TimeValue timePeriod = TimeValue.parseTimeValue(timeString, defaultTimePeriod);
Object actionObj = fields.get(ACTION_FIELD.getPreferredName());
List<AlertAction> actions = null;
if (actionObj instanceof Map) {
Map<String, Object> actionMap = (Map<String, Object>) actionObj;
actions = actionRegistry.parseActionsFromMap(actionMap);
} else {
throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]");
}
DateTime lastRan = new DateTime(0);
if( fields.get(LASTRAN_FIELD.getPreferredName()) != null){
lastRan = new DateTime(fields.get(LASTRAN_FIELD.getPreferredName()).toString());
} else if (fields.get("lastRan") != null) {
lastRan = new DateTime(fields.get("lastRan").toString());
}
DateTime running = new DateTime(0);
if (fields.get(CURRENTLY_RUNNING.getPreferredName()) != null) {
running = new DateTime(fields.get(CURRENTLY_RUNNING.getPreferredName()).toString());
}
DateTime lastActionFire = new DateTime(0);
if (fields.get(LAST_ACTION_FIRE.getPreferredName()) != null) {
lastActionFire = new DateTime(fields.get(LAST_ACTION_FIRE.getPreferredName()).toString());
}
List<String> indices = new ArrayList<>();
if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){
indices = (List<String>)fields.get(INDICES.getPreferredName());
} else {
logger.warn("Indices : " + fields.get(INDICES.getPreferredName()) + " class " + fields.get(INDICES.getPreferredName()).getClass() );
}
boolean enabled = true;
if (fields.get(ENABLED.getPreferredName()) != null ) {
logger.error(ENABLED.getPreferredName() + " " + fields.get(ENABLED.getPreferredName()));
Object enabledObj = fields.get(ENABLED.getPreferredName());
enabled = parseAsBoolean(enabledObj);
}
boolean simpleQuery = true;
if (fields.get(SIMPLE_QUERY.getPreferredName()) != null ) {
logger.error(SIMPLE_QUERY.getPreferredName() + " " + fields.get(SIMPLE_QUERY.getPreferredName()));
Object enabledObj = fields.get(SIMPLE_QUERY.getPreferredName());
simpleQuery = parseAsBoolean(enabledObj);
}
Alert alert = new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices, running, version, enabled, simpleQuery);
alert.lastActionFire(lastActionFire);
if (fields.get(TIMESTAMP_FIELD.getPreferredName()) != null) {
alert.timestampString(fields.get(TIMESTAMP_FIELD.getPreferredName()).toString());
}
public Alert addAlert(String alertName, BytesReference alertSource) {
ensureStarted();
Alert alert = alertsStore.createAlert(alertName, alertSource);
scheduler.addAlert(alertName, alert);
return alert;
}
private boolean parseAsBoolean(Object enabledObj) {
boolean enabled;
if (enabledObj instanceof Boolean){
enabled = (Boolean)enabledObj;
} else {
if (enabledObj.toString().toLowerCase(Locale.ROOT).equals("true") ||
enabledObj.toString().toLowerCase(Locale.ROOT).equals("1")) {
enabled = true;
} else if ( enabledObj.toString().toLowerCase(Locale.ROOT).equals("false") ||
enabledObj.toString().toLowerCase(Locale.ROOT).equals("0")) {
enabled = false;
} else {
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + enabledObj + "] as a boolean");
}
}
return enabled;
}
public Map<String,Alert> getSafeAlertMap() {
return ImmutableMap.copyOf(alertMap);
}
public Alert getAlertForName(String alertName) {
return alertMap.get(alertName);
ensureStarted();
return alertsStore.getAlert(alertName);
}
public List<Alert> getAllAlerts() {
ensureStarted();
return ImmutableList.copyOf(alertsStore.getAlerts().values());
}
public boolean isStarted() {
@ -522,6 +159,27 @@ public class AlertManager extends AbstractLifecycleComponent {
return actionManager.addHistory(alertName, isTriggered, dateTime, scheduledTime, srb, trigger, totalHits, actions, indices);
}
private void ensureStarted() {
if (!started.get() || !startActions.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
}
private void sendAlertsToScheduler() {
for (Map.Entry<String, Alert> entry : alertsStore.getAlerts().entrySet()) {
scheduler.addAlert(entry.getKey(), entry.getValue());
}
}
private DateTime timeActionLastTriggered(String alertName) {
Alert alert = alertsStore.getAlert(alertName);
if (alert != null) {
return alert.lastActionFire();
} else {
return null;
}
}
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override
@ -529,7 +187,7 @@ public class AlertManager extends AbstractLifecycleComponent {
if (!event.localNodeMaster()) { //We are not the master
if (started.compareAndSet(false, true)) {
scheduler.clearAlerts();
alertMap.clear();
alertsStore.clear();
}
if (startActions.compareAndSet(false, true)) {
@ -570,7 +228,7 @@ public class AlertManager extends AbstractLifecycleComponent {
public void run() {
// TODO: have some kind of retry mechanism?
try {
loadAlerts();
alertsStore.reload();
sendAlertsToScheduler();
} catch (Exception e) {
logger.warn("Error during loading of alerts from an existing .alerts index... refresh the alerts manually");

View File

@ -15,6 +15,7 @@ public class AlertingModule extends AbstractModule {
@Override
protected void configure() {
bind(AlertsStore.class).asEagerSingleton();
bind(AlertManager.class).asEagerSingleton();
bind(TriggerManager.class).asEagerSingleton();
bind(AlertScheduler.class).asEagerSingleton();

View File

@ -0,0 +1,302 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/**
*/
public class AlertsStore extends AbstractComponent {
public static final ParseField QUERY_FIELD = new ParseField("query");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod");
public static final ParseField ACTION_FIELD = new ParseField("action");
public static final ParseField LASTRAN_FIELD = new ParseField("lastRan");
public static final ParseField INDICES = new ParseField("indices");
public static final ParseField CURRENTLY_RUNNING = new ParseField("running");
public static final ParseField ENABLED = new ParseField("enabled");
public static final ParseField SIMPLE_QUERY = new ParseField("simple");
public static final ParseField TIMESTAMP_FIELD = new ParseField("timefield");
public static final ParseField LAST_ACTION_FIRE = new ParseField("lastactionfire");
private final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config
private final Client client;
private final AlertActionRegistry alertActionRegistry;
private final ConcurrentMap<String,Alert> alertMap;
@Inject
public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry) {
super(settings);
this.client = client;
this.alertActionRegistry = alertActionRegistry;
alertMap = ConcurrentCollections.newConcurrentMap();
}
/**
* Returns whether an alert with the specified name exists.
*/
public boolean hasAlert(String name) {
return alertMap.containsKey(name);
}
/**
* Returns the alert with the specified name otherwise <code>null</code> is returned.
*/
public Alert getAlert(String name) {
return alertMap.get(name);
}
/**
* Creates an alert with the specified and fails if an alert with the name already exists.
*/
public Alert createAlert(String name, BytesReference alertSource) {
if (!client.admin().indices().prepareExists(AlertManager.ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex();
}
Alert alert = parseAlert(name, alertSource, 1);
if (alertMap.putIfAbsent(name, alert) == null) {
persistAlert(name, alertSource, IndexRequest.OpType.CREATE);
} else {
throw new ElasticsearchIllegalArgumentException("There is already an alert named [" + name + "]");
}
return alert;
}
/**
* Updates the specified alert by making sure that the made changes are persisted.
*/
public void updateAlert(Alert alert) {
IndexRequest updateRequest = new IndexRequest();
updateRequest.index(AlertManager.ALERT_INDEX);
updateRequest.type(AlertManager.ALERT_TYPE);
updateRequest.id(alert.alertName());
updateRequest.version(alert.version());
XContentBuilder alertBuilder;
try {
alertBuilder = XContentFactory.jsonBuilder();
alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS);
} catch (IOException ie) {
throw new ElasticsearchException("Unable to serialize alert ["+ alert.alertName() + "]", ie);
}
updateRequest.source(alertBuilder);
IndexResponse response = client.index(updateRequest).actionGet();
alert.version(response.getVersion());
// Don't need to update the alertMap, since we are working on an instance from it.
assert alertMap.get(alert.alertName()) == alert;
}
/**
* Deletes the alert with the specified name if exists
*/
public void deleteAlert(String name) {
Alert alert = alertMap.remove(name);
if (alert != null) {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.id(name);
deleteRequest.index(AlertManager.ALERT_INDEX);
deleteRequest.type(AlertManager.ALERT_TYPE);
deleteRequest.version(alert.version());
DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet();
assert deleteResponse.isFound();
}
}
/**
* Clears the in-memory representation of the alerts and loads the alerts from the .alerts index.
*/
public void reload() {
clear();
loadAlerts();
}
/**
* Clears the in-memory representation of the alerts
*/
public void clear() {
alertMap.clear();
}
public ConcurrentMap<String, Alert> getAlerts() {
return alertMap;
}
private void persistAlert(String alertName, BytesReference alertSource, IndexRequest.OpType opType) {
IndexRequest indexRequest = new IndexRequest(AlertManager.ALERT_INDEX, AlertManager.ALERT_TYPE, alertName);
indexRequest.listenerThreaded(false);
indexRequest.source(alertSource, false);
indexRequest.opType(opType);
client.index(indexRequest).actionGet();
}
private void loadAlerts() {
if (!client.admin().indices().prepareExists(AlertManager.ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex();
}
SearchResponse searchResponse = client.prepareSearch().setSource(
"{ \"query\" : " +
"{ \"match_all\" : {}}," +
"\"size\" : \"100\"" +
"}"
).setTypes(AlertManager.ALERT_TYPE).setIndices(AlertManager.ALERT_INDEX).execute().actionGet();
for (SearchHit sh : searchResponse.getHits()) {
String alertId = sh.getId();
try {
Alert alert = parseAlert(alertId, sh);
alertMap.put(alertId, alert);
} catch (ElasticsearchException e) {
logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh);
}
}
logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size());
}
private Alert parseAlert(String alertId, SearchHit sh) {
return parseAlert(alertId, sh.getSourceRef(), sh.getVersion());
}
private Alert parseAlert(String alertId, BytesReference bytesReference, long version) {
// TODO: streaming parsing!
Map<String, Object> fields = XContentHelper.convertToMap(bytesReference, false).v2();
String query = fields.get(QUERY_FIELD.getPreferredName()).toString();
String schedule = fields.get(SCHEDULE_FIELD.getPreferredName()).toString();
Object triggerObj = fields.get(TRIGGER_FIELD.getPreferredName());
AlertTrigger trigger = null;
if (triggerObj instanceof Map) {
Map<String, Object> triggerMap = (Map<String, Object>) triggerObj;
trigger = TriggerManager.parseTriggerFromMap(triggerMap);
} else {
throw new ElasticsearchException("Unable to parse trigger [" + triggerObj + "]");
}
String timeString = fields.get(TIMEPERIOD_FIELD.getPreferredName()).toString();
TimeValue timePeriod = TimeValue.parseTimeValue(timeString, defaultTimePeriod);
Object actionObj = fields.get(ACTION_FIELD.getPreferredName());
List<AlertAction> actions = null;
if (actionObj instanceof Map) {
Map<String, Object> actionMap = (Map<String, Object>) actionObj;
actions = alertActionRegistry.parseActionsFromMap(actionMap);
} else {
throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]");
}
DateTime lastRan = new DateTime(0);
if( fields.get(LASTRAN_FIELD.getPreferredName()) != null){
lastRan = new DateTime(fields.get(LASTRAN_FIELD.getPreferredName()).toString());
} else if (fields.get("lastRan") != null) {
lastRan = new DateTime(fields.get("lastRan").toString());
}
DateTime running = new DateTime(0);
if (fields.get(CURRENTLY_RUNNING.getPreferredName()) != null) {
running = new DateTime(fields.get(CURRENTLY_RUNNING.getPreferredName()).toString());
}
DateTime lastActionFire = new DateTime(0);
if (fields.get(LAST_ACTION_FIRE.getPreferredName()) != null) {
lastActionFire = new DateTime(fields.get(LAST_ACTION_FIRE.getPreferredName()).toString());
}
List<String> indices = new ArrayList<>();
if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){
indices = (List<String>)fields.get(INDICES.getPreferredName());
} else {
logger.warn("Indices : " + fields.get(INDICES.getPreferredName()) + " class " + fields.get(INDICES.getPreferredName()).getClass() );
}
boolean enabled = true;
if (fields.get(ENABLED.getPreferredName()) != null ) {
logger.error(ENABLED.getPreferredName() + " " + fields.get(ENABLED.getPreferredName()));
Object enabledObj = fields.get(ENABLED.getPreferredName());
enabled = parseAsBoolean(enabledObj);
}
boolean simpleQuery = true;
if (fields.get(SIMPLE_QUERY.getPreferredName()) != null ) {
logger.error(SIMPLE_QUERY.getPreferredName() + " " + fields.get(SIMPLE_QUERY.getPreferredName()));
Object enabledObj = fields.get(SIMPLE_QUERY.getPreferredName());
simpleQuery = parseAsBoolean(enabledObj);
}
Alert alert = new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices, running, version, enabled, simpleQuery);
alert.lastActionFire(lastActionFire);
if (fields.get(TIMESTAMP_FIELD.getPreferredName()) != null) {
alert.timestampString(fields.get(TIMESTAMP_FIELD.getPreferredName()).toString());
}
return alert;
}
private boolean parseAsBoolean(Object enabledObj) {
boolean enabled;
if (enabledObj instanceof Boolean){
enabled = (Boolean)enabledObj;
} else {
if (enabledObj.toString().toLowerCase(Locale.ROOT).equals("true") ||
enabledObj.toString().toLowerCase(Locale.ROOT).equals("1")) {
enabled = true;
} else if ( enabledObj.toString().toLowerCase(Locale.ROOT).equals("false") ||
enabledObj.toString().toLowerCase(Locale.ROOT).equals("0")) {
enabled = false;
} else {
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + enabledObj + "] as a boolean");
}
}
return enabled;
}
private ClusterHealthStatus createAlertsIndex() {
CreateIndexResponse cir = client.admin().indices().prepareCreate(AlertManager.ALERT_INDEX).addMapping(AlertManager.ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
ClusterHealthResponse actionGet = client.admin().cluster()
.health(Requests.clusterHealthRequest(AlertManager.ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
return actionGet.getStatus();
}
}

View File

@ -5,17 +5,16 @@
*/
package org.elasticsearch.alerts.plugin;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.AlertingModule;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.AbstractPlugin;
import java.util.Collection;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
public class AlertsPlugin extends AbstractPlugin {
@ -33,7 +32,6 @@ public class AlertsPlugin extends AbstractPlugin {
@Override
public Collection<java.lang.Class<? extends LifecycleComponent>> services() {
Collection<java.lang.Class<? extends LifecycleComponent>> services = Lists.newArrayList();
services.add(AlertManager.class);
services.add(AlertScheduler.class);
return services;
}

View File

@ -5,21 +5,22 @@
*/
package org.elasticsearch.alerts.rest;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.*;
import java.io.IOException;
import java.util.Map;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.rest.RestStatus.*;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestStatus.*;
public class AlertRestHandler implements RestHandler {
@ -60,7 +61,7 @@ public class AlertRestHandler implements RestHandler {
private boolean dispatchRequest(RestRequest request, RestChannel restChannel) throws IOException, InterruptedException, ExecutionException {
//@TODO : change these direct calls to actions/request/response/listener once we create the java client API
if (request.path().contains("/_refresh")) {
alertManager.refreshAlerts();
alertManager.clearAndReload();
XContentBuilder builder = getListOfAlerts();
restChannel.sendResponse(new BytesRestResponse(OK,builder));
return true;
@ -71,42 +72,22 @@ public class AlertRestHandler implements RestHandler {
} else if (request.path().contains("/_enable")) {
logger.warn("Enabling [{}]", request.param("name"));
String alertName = request.param("name");
boolean enabled = alertManager.enableAlert(alertName);
boolean enabled = true;//alertManager.enableAlert(alertName);
XContentBuilder responseBuilder = buildEnabledResponse(alertName, enabled);
restChannel.sendResponse(new BytesRestResponse(OK,responseBuilder));
return true;
} else if (request.path().contains("/_disable")) {
logger.warn("Disabling [{}]", request.param("name"));
String alertName = request.param("name");
boolean enabled = alertManager.disableAlert(alertName);
boolean enabled = true;//alertManager.disableAlert(alertName);
XContentBuilder responseBuilder = buildEnabledResponse(alertName, enabled);
restChannel.sendResponse(new BytesRestResponse(OK,responseBuilder));
return true;
} else if (request.method() == POST && request.path().contains("/_create")) {
//TODO : this should all be moved to an action
Alert alert;
try {
alert = alertManager.parseAlert(request.param("name"), XContentHelper.convertToMap(request.content(), request.contentUnsafe()).v2());
} catch (Exception e) {
logger.error("Failed to parse alert", e);
throw e;
}
try {
boolean added = alertManager.addAlert(alert.alertName(), alert, true);
if (added) {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
alert.toXContent(builder, ToXContent.EMPTY_PARAMS);
restChannel.sendResponse(new BytesRestResponse(OK, builder));
} else {
restChannel.sendResponse(new BytesRestResponse(BAD_REQUEST));
}
} catch (ElasticsearchIllegalArgumentException eia) {
XContentBuilder failed = XContentFactory.jsonBuilder().prettyPrint();
failed.startObject();
failed.field("ERROR", eia.getMessage());
failed.endObject();
restChannel.sendResponse(new BytesRestResponse(BAD_REQUEST,failed));
}
Alert alert = alertManager.addAlert(request.param("name"), request.content());
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
alert.toXContent(builder, ToXContent.EMPTY_PARAMS);
restChannel.sendResponse(new BytesRestResponse(OK, builder));
return true;
} else if (request.method() == DELETE) {
String alertName = request.param("name");
@ -133,12 +114,12 @@ public class AlertRestHandler implements RestHandler {
}
private XContentBuilder getListOfAlerts() throws IOException {
Map<String, Alert> alertMap = alertManager.getSafeAlertMap();
List<Alert> alerts = alertManager.getAllAlerts();
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
for (Map.Entry<String, Alert> alertEntry : alertMap.entrySet()) {
builder.field(alertEntry.getKey());
alertEntry.getValue().toXContent(builder, ToXContent.EMPTY_PARAMS);
for (Alert alert : alerts) {
builder.field(alert.alertName());
alert.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
builder.endObject();
return builder;

View File

@ -114,8 +114,7 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
public boolean deleteAlertFromSchedule(String alertName) {
try {
scheduler.deleteJob(new JobKey(alertName));
return true;
return scheduler.deleteJob(new JobKey(alertName));
} catch (SchedulerException se){
throw new ElasticsearchException("Failed to remove [" + alertName + "] from the scheduler", se);
}

View File

@ -6,11 +6,7 @@
package org.elasticsearch.alerts;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionFactory;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.*;
import org.elasticsearch.alerts.plugin.AlertsPlugin;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.AlertTrigger;
@ -126,7 +122,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
true,
true
);
alertManager.addAlert("my-first-alert", alert, true);
alertManager.addAlert("my-first-alert", jsonBuilder().value(alert).bytes());
assertBusy(new Runnable() {
@Override
public void run() {