Moved over the streaming parsing.

Original commit: elastic/x-pack-elasticsearch@10bd127df5
This commit is contained in:
Martijn van Groningen 2014-10-29 00:54:18 +01:00
parent 6b2fbe400e
commit 3625b5bc91
12 changed files with 346 additions and 265 deletions

View File

@ -17,7 +17,7 @@ import java.util.List;
public class Alert implements ToXContent { public class Alert implements ToXContent {
private final String alertName; private String alertName;
private String queryName; private String queryName;
private AlertTrigger trigger; private AlertTrigger trigger;
private TimeValue timePeriod; private TimeValue timePeriod;
@ -93,6 +93,10 @@ public class Alert implements ToXContent {
return alertName; return alertName;
} }
public void alertName(String alertName) {
this.alertName = alertName;
}
public String queryName() { public String queryName() {
return queryName; return queryName;
} }
@ -141,6 +145,9 @@ public class Alert implements ToXContent {
this.lastRan = lastRan; this.lastRan = lastRan;
} }
public Alert() {
}
public Alert(String alertName, String queryName, AlertTrigger trigger, public Alert(String alertName, String queryName, AlertTrigger trigger,
TimeValue timePeriod, List<AlertAction> actions, String schedule, DateTime lastRan, TimeValue timePeriod, List<AlertAction> actions, String schedule, DateTime lastRan,
List<String> indices, DateTime running, long version, boolean enabled, boolean simpleQuery){ List<String> indices, DateTime running, long version, boolean enabled, boolean simpleQuery){
@ -163,35 +170,49 @@ public class Alert implements ToXContent {
//Note we deliberately don't serialize the version here //Note we deliberately don't serialize the version here
builder.startObject(); builder.startObject();
builder.field(AlertsStore.QUERY_FIELD.getPreferredName(), queryName); if (queryName != null) {
builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule); builder.field(AlertsStore.QUERY_NAME_FIELD.getPreferredName(), queryName);
builder.field(AlertsStore.TIMEPERIOD_FIELD.getPreferredName(), timePeriod); }
builder.field(AlertsStore.LASTRAN_FIELD.getPreferredName(), lastRan); if (schedule != null) {
builder.field(AlertsStore.CURRENTLY_RUNNING.getPreferredName(), running); builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule);
}
if (timePeriod != null) {
builder.field(AlertsStore.TIMEPERIOD_FIELD.getPreferredName(), timePeriod);
}
if (lastRan != null) {
builder.field(AlertsStore.LASTRAN_FIELD.getPreferredName(), lastRan);
}
if (running != null) {
builder.field(AlertsStore.CURRENTLY_RUNNING.getPreferredName(), running);
}
builder.field(AlertsStore.ENABLED.getPreferredName(), enabled); builder.field(AlertsStore.ENABLED.getPreferredName(), enabled);
builder.field(AlertsStore.SIMPLE_QUERY.getPreferredName(), simpleQuery); builder.field(AlertsStore.SIMPLE_QUERY.getPreferredName(), simpleQuery);
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire); if (lastActionFire != null) {
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
builder.field(AlertsStore.TRIGGER_FIELD.getPreferredName()); }
trigger.toXContent(builder, params);
builder.field(AlertsStore.ACTION_FIELD.getPreferredName()); if (actions != null && !actions.isEmpty()) {
builder.startObject(AlertsStore.ACTION_FIELD.getPreferredName());
builder.startObject(); for (AlertAction action : actions){
for (AlertAction action : actions){ builder.field(action.getActionName());
builder.field(action.getActionName()); action.toXContent(builder, params);
action.toXContent(builder, params); }
builder.endObject();
} }
builder.endObject();
if (indices != null && !indices.isEmpty()) { if (indices != null && !indices.isEmpty()) {
builder.field(AlertsStore.INDICES.getPreferredName()); builder.startArray(AlertsStore.INDICES.getPreferredName());
builder.startArray();
for (String index : indices){ for (String index : indices){
builder.value(index); builder.value(index);
} }
builder.endArray(); builder.endArray();
} }
if (trigger != null) {
builder.field(AlertsStore.TRIGGER_FIELD.getPreferredName());
trigger.toXContent(builder, params);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }

View File

@ -17,7 +17,6 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
@ -30,24 +29,19 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
/** /**
*/ */
public class AlertsStore extends AbstractComponent { public class AlertsStore extends AbstractComponent {
public static final ParseField QUERY_FIELD = new ParseField("query"); public static final ParseField QUERY_NAME_FIELD = new ParseField("query");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger"); public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod"); public static final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod");
@ -200,96 +194,73 @@ public class AlertsStore extends AbstractComponent {
return parseAlert(alertId, sh.getSourceRef(), sh.getVersion()); return parseAlert(alertId, sh.getSourceRef(), sh.getVersion());
} }
private Alert parseAlert(String alertId, BytesReference bytesReference, long version) { private Alert parseAlert(String alertName, BytesReference source, long version) {
// TODO: streaming parsing! Alert alert = new Alert();
Map<String, Object> fields = XContentHelper.convertToMap(bytesReference, false).v2(); alert.alertName(alertName);
String query = fields.get(QUERY_FIELD.getPreferredName()).toString(); alert.version(version);
String schedule = fields.get(SCHEDULE_FIELD.getPreferredName()).toString(); try (XContentParser parser = XContentHelper.createParser(source)) {
Object triggerObj = fields.get(TRIGGER_FIELD.getPreferredName()); String currentFieldName = null;
AlertTrigger trigger = null; XContentParser.Token token = parser.nextToken();
if (triggerObj instanceof Map) { assert token == XContentParser.Token.START_OBJECT;
Map<String, Object> triggerMap = (Map<String, Object>) triggerObj; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
trigger = TriggerManager.parseTriggerFromMap(triggerMap); if (token == XContentParser.Token.FIELD_NAME) {
} else { currentFieldName = parser.currentName();
throw new ElasticsearchException("Unable to parse trigger [" + triggerObj + "]"); } else if (token == XContentParser.Token.START_OBJECT) {
} if (TRIGGER_FIELD.match(currentFieldName)) {
alert.trigger(TriggerManager.parseTrigger(parser));
String timeString = fields.get(TIMEPERIOD_FIELD.getPreferredName()).toString(); } else if (ACTION_FIELD.match(currentFieldName)) {
TimeValue timePeriod = TimeValue.parseTimeValue(timeString, defaultTimePeriod); List<AlertAction> actions = alertActionRegistry.instantiateAlertActions(parser);
alert.actions(actions);
Object actionObj = fields.get(ACTION_FIELD.getPreferredName()); } else {
List<AlertAction> actions = null; throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
if (actionObj instanceof Map) { }
Map<String, Object> actionMap = (Map<String, Object>) actionObj; } else if (token == XContentParser.Token.START_ARRAY) {
actions = alertActionRegistry.parseActionsFromMap(actionMap); if (INDICES.match(currentFieldName)) {
} else { List<String> indices = new ArrayList<>();
throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]"); while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
} indices.add(parser.text());
}
DateTime lastRan = new DateTime(0); alert.indices(indices);
if( fields.get(LASTRAN_FIELD.getPreferredName()) != null){ } else {
lastRan = new DateTime(fields.get(LASTRAN_FIELD.getPreferredName()).toString()); throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
} else if (fields.get("lastRan") != null) { }
lastRan = new DateTime(fields.get("lastRan").toString()); } else if (token.isValue()) {
} if (QUERY_NAME_FIELD.match(currentFieldName)) {
alert.queryName(parser.textOrNull());
DateTime running = new DateTime(0); } else if (SCHEDULE_FIELD.match(currentFieldName)) {
if (fields.get(CURRENTLY_RUNNING.getPreferredName()) != null) { alert.schedule(parser.textOrNull());
running = new DateTime(fields.get(CURRENTLY_RUNNING.getPreferredName()).toString()); } else if (TIMEPERIOD_FIELD.match(currentFieldName)) {
} alert.timestampString(parser.textOrNull());
} else if (LASTRAN_FIELD.match(currentFieldName)) {
DateTime lastActionFire = new DateTime(0); alert.lastRan(DateTime.parse(parser.textOrNull()));
if (fields.get(LAST_ACTION_FIRE.getPreferredName()) != null) { } else if (CURRENTLY_RUNNING.match(currentFieldName)) {
lastActionFire = new DateTime(fields.get(LAST_ACTION_FIRE.getPreferredName()).toString()); alert.running(DateTime.parse(parser.textOrNull()));
} } else if (ENABLED.match(currentFieldName)) {
alert.enabled(parser.booleanValue());
List<String> indices = new ArrayList<>(); } else if (SIMPLE_QUERY.match(currentFieldName)) {
if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){ alert.simpleQuery(parser.booleanValue());
indices = (List<String>)fields.get(INDICES.getPreferredName()); } else if (TIMEPERIOD_FIELD.match(currentFieldName)) {
} else { alert.timePeriod(TimeValue.parseTimeValue(parser.textOrNull(), defaultTimePeriod));
logger.warn("Indices : " + fields.get(INDICES.getPreferredName()) + " class " + fields.get(INDICES.getPreferredName()).getClass() ); } else if (LAST_ACTION_FIRE.match(currentFieldName)) {
} alert.lastActionFire(DateTime.parse(parser.textOrNull()));
} else {
boolean enabled = true; throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
if (fields.get(ENABLED.getPreferredName()) != null ) { }
logger.error(ENABLED.getPreferredName() + " " + fields.get(ENABLED.getPreferredName())); } else {
Object enabledObj = fields.get(ENABLED.getPreferredName()); throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
enabled = parseAsBoolean(enabledObj); }
}
boolean simpleQuery = true;
if (fields.get(SIMPLE_QUERY.getPreferredName()) != null ) {
logger.error(SIMPLE_QUERY.getPreferredName() + " " + fields.get(SIMPLE_QUERY.getPreferredName()));
Object enabledObj = fields.get(SIMPLE_QUERY.getPreferredName());
simpleQuery = parseAsBoolean(enabledObj);
}
Alert alert = new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices, running, version, enabled, simpleQuery);
alert.lastActionFire(lastActionFire);
if (fields.get(TIMESTAMP_FIELD.getPreferredName()) != null) {
alert.timestampString(fields.get(TIMESTAMP_FIELD.getPreferredName()).toString());
}
return alert;
}
private boolean parseAsBoolean(Object enabledObj) {
boolean enabled;
if (enabledObj instanceof Boolean){
enabled = (Boolean)enabledObj;
} else {
if (enabledObj.toString().toLowerCase(Locale.ROOT).equals("true") ||
enabledObj.toString().toLowerCase(Locale.ROOT).equals("1")) {
enabled = true;
} else if ( enabledObj.toString().toLowerCase(Locale.ROOT).equals("false") ||
enabledObj.toString().toLowerCase(Locale.ROOT).equals("0")) {
enabled = false;
} else {
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + enabledObj + "] as a boolean");
} }
} catch (IOException e) {
throw new ElasticsearchException("Error during parsing alert", e);
} }
return enabled;
if (alert.timePeriod() == null) {
alert.timePeriod(defaultTimePeriod);
}
if (alert.lastActionFire() == null) {
alert.lastActionFire(new DateTime(0));
}
return alert;
} }
private ClusterHealthStatus createAlertsIndex() { private ClusterHealthStatus createAlertsIndex() {

View File

@ -12,7 +12,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
public interface AlertAction extends ToXContent { public interface AlertAction extends ToXContent {
public String getActionName(); public String getActionName();
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException; public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException;
public boolean doAction(Alert alert, AlertActionEntry actionEntry); public boolean doAction(Alert alert, AlertActionEntry actionEntry);

View File

@ -5,6 +5,12 @@
*/ */
package org.elasticsearch.alerts.actions; package org.elasticsearch.alerts.actions;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
public interface AlertActionFactory { public interface AlertActionFactory {
AlertAction createAction(Object parameters);
AlertAction createAction(XContentParser parser) throws IOException;
} }

View File

@ -6,6 +6,7 @@
package org.elasticsearch.alerts.actions; package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -15,22 +16,18 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.alerts.AlertManager; import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -38,11 +35,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -197,45 +192,82 @@ public class AlertActionManager {
protected AlertActionEntry parseHistory(String historyId, SearchHit sh, long version) { protected AlertActionEntry parseHistory(String historyId, SearchHit sh, long version) {
Map<String, Object> fields = sh.sourceAsMap(); return parseHistory(historyId, sh.getSourceRef(), version);
return parseHistory(historyId, fields, version);
} }
protected AlertActionEntry parseHistory(String historyId, Map<String,Object> fields, long version) { protected AlertActionEntry parseHistory(String historyId, BytesReference source, long version) {
return parseHistory(historyId, fields, version, actionRegistry, logger); return parseHistory(historyId, source, version, actionRegistry, logger);
} }
protected static AlertActionEntry parseHistory(String historyId, Map<String,Object> fields, long version, protected static AlertActionEntry parseHistory(String historyId, BytesReference source, long version,
AlertActionRegistry actionRegistry, ESLogger logger) { AlertActionRegistry actionRegistry, ESLogger logger) {
String alertName = fields.get(ALERT_NAME_FIELD).toString(); AlertActionEntry entry = new AlertActionEntry();
boolean triggered = (Boolean)fields.get(TRIGGERED_FIELD); entry.setId(historyId);
DateTime fireTime = new DateTime(fields.get(FIRE_TIME_FIELD).toString()); entry.setVersion(version);
DateTime scheduledFireTime = new DateTime(fields.get(SCHEDULED_FIRE_TIME_FIELD).toString()); try (XContentParser parser = XContentHelper.createParser(source)) {
AlertTrigger trigger = TriggerManager.parseTriggerFromMap((Map<String,Object>)fields.get(TRIGGER_FIELD)); String currentFieldName = null;
String queryRan = fields.get(QUERY_RAN_FIELD).toString(); XContentParser.Token token = parser.nextToken();
long numberOfResults = ((Number)fields.get(NUMBER_OF_RESULTS_FIELD)).longValue(); assert token == XContentParser.Token.START_OBJECT;
Object actionObj = fields.get(ACTIONS_FIELD); while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
List<AlertAction> actions; if (token == XContentParser.Token.FIELD_NAME) {
if (actionObj instanceof Map) { currentFieldName = parser.currentName();
Map<String, Object> actionMap = (Map<String, Object>) actionObj; } else if (token == XContentParser.Token.START_OBJECT) {
actions = actionRegistry.parseActionsFromMap(actionMap); switch (currentFieldName) {
} else { case ACTIONS_FIELD:
throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]"); entry.setActions(actionRegistry.instantiateAlertActions(parser));
break;
case TRIGGER_FIELD:
entry.setTrigger(TriggerManager.parseTrigger(parser));
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
switch (currentFieldName) {
case INDICES_FIELD:
List<String> indices = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
indices.add(parser.text());
}
entry.setIndices(indices);
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else if (token.isValue()) {
switch (currentFieldName) {
case ALERT_NAME_FIELD:
entry.setAlertName(parser.text());
break;
case TRIGGERED_FIELD:
entry.setTriggered(parser.booleanValue());
break;
case FIRE_TIME_FIELD:
entry.setFireTime(DateTime.parse(parser.text()));
break;
case SCHEDULED_FIRE_TIME_FIELD:
entry.setScheduledTime(DateTime.parse(parser.text()));
break;
case QUERY_RAN_FIELD:
entry.setTriggeringQuery(parser.text());
break;
case NUMBER_OF_RESULTS_FIELD:
entry.setNumberOfResults(parser.longValue());
break;
case AlertActionState.FIELD_NAME:
entry.setEntryState(AlertActionState.fromString(parser.text()));
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
}
}
} catch (IOException e) {
throw new ElasticsearchException("Error during parsing alert action", e);
} }
return entry;
List<String> indices = new ArrayList<>();
if (fields.get(INDICES_FIELD) != null && fields.get(INDICES_FIELD) instanceof List){
indices = (List<String>)fields.get(INDICES_FIELD);
} else {
logger.debug("Indices : " + fields.get(INDICES_FIELD) + " class " +
(fields.get(INDICES_FIELD) != null ? fields.get(INDICES_FIELD).getClass() : null ));
}
String stateString = fields.get(AlertActionState.FIELD_NAME).toString();
AlertActionState state = AlertActionState.fromString(stateString);
return new AlertActionEntry(historyId, version, alertName, triggered, fireTime, scheduledFireTime, trigger, queryRan,
numberOfResults, actions, indices, state);
} }
@ -309,7 +341,7 @@ public class AlertActionManager {
getRequest.id(entryId); getRequest.id(entryId);
GetResponse getResponse = client.get(getRequest).actionGet(); GetResponse getResponse = client.get(getRequest).actionGet();
if (getResponse.isExists()) { if (getResponse.isExists()) {
return parseHistory(entryId, getResponse.getSourceAsMap(), getResponse.getVersion()); return parseHistory(entryId, getResponse.getSourceAsBytesRef(), getResponse.getVersion());
} else { } else {
throw new ElasticsearchException("Unable to find [" + entryId + "] in the [" + ALERT_HISTORY_INDEX + "]" ); throw new ElasticsearchException("Unable to find [" + entryId + "] in the [" + ALERT_HISTORY_INDEX + "]" );
} }

View File

@ -7,16 +7,16 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
public class AlertActionRegistry extends AbstractComponent { public class AlertActionRegistry extends AbstractComponent {
@ -37,16 +37,23 @@ public class AlertActionRegistry extends AbstractComponent {
.build(); .build();
} }
public List<AlertAction> parseActionsFromMap(Map<String,Object> actionMap) { public List<AlertAction> instantiateAlertActions(XContentParser parser) throws IOException {
ImmutableOpenMap<String, AlertActionFactory> actionImplemented = this.actionImplemented;
List<AlertAction> actions = new ArrayList<>(); List<AlertAction> actions = new ArrayList<>();
for (Map.Entry<String, Object> actionEntry : actionMap.entrySet()) { ImmutableOpenMap<String, AlertActionFactory> actionImplemented = this.actionImplemented;
AlertActionFactory factory = actionImplemented.get(actionEntry.getKey()); String actionFactoryName = null;
if (factory != null) { XContentParser.Token token;
actions.add(factory.createAction(actionEntry.getValue())); while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
} else { if (token == XContentParser.Token.FIELD_NAME) {
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionEntry.getKey() + "]"); actionFactoryName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
AlertActionFactory factory = actionImplemented.get(actionFactoryName);
if (factory != null) {
actions.add(factory.createAction(parser));
} else {
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionFactoryName + "]");
}
} }
} }
return actions; return actions;
} }

View File

@ -9,25 +9,27 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.mail.*; import javax.mail.*;
import javax.mail.internet.AddressException; import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress; import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage; import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class EmailAlertAction implements AlertAction { public class EmailAlertAction implements AlertAction {
List<Address> emailAddresses = new ArrayList<>();
String displayField = null;
private String displayField = null;
private List<Address> emailAddresses = new ArrayList<>();
// TODO: Move to factory and make configurable
int port = 587;
String server = "smtp.gmail.com";
String from = "esalertingtest@gmail.com"; String from = "esalertingtest@gmail.com";
String passwd = "elasticsearchforthewin"; String passwd = "elasticsearchforthewin";
String server = "smtp.gmail.com";
int port = 587;
public EmailAlertAction(String ... addresses){ public EmailAlertAction(String displayField, String ... addresses){
for (String address : addresses) { for (String address : addresses) {
addEmailAddress(address); addEmailAddress(address);
} }

View File

@ -5,37 +5,47 @@
*/ */
package org.elasticsearch.alerts.actions; package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
public class EmailAlertActionFactory implements AlertActionFactory { public class EmailAlertActionFactory implements AlertActionFactory {
@Override @Override
public AlertAction createAction(Object parameters) { public AlertAction createAction(XContentParser parser) throws IOException {
EmailAlertAction action = new EmailAlertAction(); String display = null;
if (parameters instanceof List){ List<String> addresses = new ArrayList<>();
for (String emailAddress : (List<String>)parameters) {
action.addEmailAddress(emailAddress); String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "display":
display = parser.text();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
switch (currentFieldName) {
case "addresses":
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
addresses.add(parser.text());
}
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
} }
} else if (parameters instanceof Map) {
Map<String,Object> paramMap = (Map<String,Object>)parameters;
Object addresses = paramMap.get("addresses");
if (addresses == null){
throw new ElasticsearchException("Unable to parse email addresses from : " + parameters);
}
for (String emailAddress : (List<String>)addresses) {
action.addEmailAddress(emailAddress);
}
Object displayField = paramMap.get("display");
if (displayField != null){
action.displayField(displayField.toString());
}
} else {
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an EmailAlertAction");
} }
return action; return new EmailAlertAction(display, addresses.toArray(new String[addresses.size()]));
} }
} }

View File

@ -7,42 +7,46 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentParser;
import java.util.Locale; import java.io.IOException;
import java.util.Map;
/** /**
* Created by brian on 8/17/14. * Created by brian on 8/17/14.
*/ */
public class IndexAlertActionFactory implements AlertActionFactory { public class IndexAlertActionFactory implements AlertActionFactory {
Client client;
private final Client client;
public IndexAlertActionFactory(Client client){ public IndexAlertActionFactory(Client client){
this.client = client; this.client = client;
} }
@Override @Override
public AlertAction createAction(Object parameters) { public AlertAction createAction(XContentParser parser) throws IOException {
try { String index = null;
if (parameters instanceof Map) { String type = null;
Map<String, Object> paramMap = (Map<String, Object>) parameters;
String index = paramMap.get("index").toString();
if (!index.toLowerCase(Locale.ROOT).equals(index)) {
throw new ElasticsearchIllegalArgumentException("Index names must be all lowercase");
}
String type = paramMap.get("type").toString(); String currentFieldName = null;
if (!type.toLowerCase(Locale.ROOT).equals(type)) { XContentParser.Token token;
throw new ElasticsearchIllegalArgumentException("Type names must be all lowercase"); while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "index":
index = parser.text();
break;
case "type":
type = parser.text();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
} }
return new IndexAlertAction(index, type, client);
} else { } else {
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an IndexAlertAction"); throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
} }
} catch (Throwable t){
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an IndexAlertAction", t);
} }
return new IndexAlertAction(index, type, client);
} }
} }

View File

@ -13,14 +13,11 @@ import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import java.util.Locale; import java.io.IOException;
import java.util.Map; import java.util.Map;
@ -32,34 +29,52 @@ public class TriggerManager extends AbstractComponent {
private final AlertManager alertManager; private final AlertManager alertManager;
private final ScriptService scriptService; private final ScriptService scriptService;
public static AlertTrigger parseTriggerFromMap(Map<String, Object> triggerMap) { public static AlertTrigger parseTrigger(XContentParser parser) throws IOException {
for (Map.Entry<String,Object> entry : triggerMap.entrySet()){ AlertTrigger trigger = null;
AlertTrigger.TriggerType type = AlertTrigger.TriggerType.fromString(entry.getKey());
if (type == AlertTrigger.TriggerType.SCRIPT) {
ScriptedAlertTrigger scriptedTrigger = parseScriptedTrigger(entry.getValue());
return new AlertTrigger(scriptedTrigger);
} else {
AlertTrigger.SimpleTrigger simpleTrigger = AlertTrigger.SimpleTrigger.fromString(entry.getValue().toString().substring(0, 1));
int value = Integer.valueOf(entry.getValue().toString().substring(1));
return new AlertTrigger(simpleTrigger, type, value);
}
}
throw new ElasticsearchIllegalArgumentException();
}
private static ScriptedAlertTrigger parseScriptedTrigger(Object value) { String currentFieldName = null;
if (value instanceof Map) { XContentParser.Token token;
Map<String,Object> valueMap = (Map<String,Object>)value; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
try { if (token == XContentParser.Token.FIELD_NAME) {
return new ScriptedAlertTrigger(valueMap.get("script").toString(), currentFieldName = parser.currentName();
ScriptService.ScriptType.valueOf(valueMap.get("script_type").toString().toUpperCase(Locale.ROOT)), ///TODO : Fix ScriptType to parse strings properly, currently only accepts uppercase versions of the enum names } else if (token == XContentParser.Token.START_OBJECT) {
valueMap.get("script_lang").toString()); switch (currentFieldName) {
} catch (Exception e){ case "script":
throw new ElasticsearchIllegalArgumentException("Unable to parse " + value + " as a ScriptedAlertTrigger", e); String script = null;
ScriptService.ScriptType scriptType = null;
String scriptLang = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "script" :
script = parser.text();
break;
case "script_type" :
scriptType = ScriptService.ScriptType.valueOf(parser.text());
break;
case "script_lang" :
scriptLang = parser.text();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
}
}
trigger = new AlertTrigger(new ScriptedAlertTrigger(script, scriptType, scriptLang));
break;
default:
break;
}
} else if (token.isValue()) {
String expression = parser.text();
AlertTrigger.SimpleTrigger simpleTrigger = AlertTrigger.SimpleTrigger.fromString(expression.substring(0, 1));
int value = Integer.valueOf(expression.substring(1));
trigger = new AlertTrigger(simpleTrigger, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, value);
} }
} else {
throw new ElasticsearchIllegalArgumentException("Unable to parse " + value + " as a ScriptedAlertTrigger, not a Map");
} }
return trigger;
} }
@Inject @Inject

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
@ -103,7 +104,8 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName()); AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
alertActionRegistry.registerAction("test", new AlertActionFactory() { alertActionRegistry.registerAction("test", new AlertActionFactory() {
@Override @Override
public AlertAction createAction(Object parameters) { public AlertAction createAction(XContentParser parser) throws IOException {
parser.nextToken();
return alertAction; return alertAction;
} }
}); });

View File

@ -5,9 +5,12 @@
*/ */
package org.elasticsearch.alerts.actions; package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.BasicAlertingTest;
import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
@ -16,15 +19,19 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/** /**
*/ */
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 1) @ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 1)
public class AlertActionsTest extends ElasticsearchIntegrationTest { public class AlertActionsTest extends ElasticsearchIntegrationTest {
private static final FormatDateTimeFormatter formatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER;
@Test @Test
public void testAlertActionParser(){ public void testAlertActionParser() throws Exception {
DateTime fireTime = new DateTime(); DateTime fireTime = new DateTime(DateTimeZone.UTC);
DateTime scheduledFireTime = new DateTime(); DateTime scheduledFireTime = new DateTime(DateTimeZone.UTC);
Map<String, Object> triggerMap = new HashMap<>(); Map<String, Object> triggerMap = new HashMap<>();
triggerMap.put("numberOfEvents", ">1"); triggerMap.put("numberOfEvents", ">1");
Map<String,Object> actionMap = new HashMap<>(); Map<String,Object> actionMap = new HashMap<>();
@ -34,18 +41,20 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
emailParamMap.put("addresses", addresses); emailParamMap.put("addresses", addresses);
actionMap.put("email", emailParamMap); actionMap.put("email", emailParamMap);
Map<String, Object> fieldMap = new HashMap<>(); XContentBuilder builder = jsonBuilder();
fieldMap.put(AlertActionManager.ALERT_NAME_FIELD, "testName"); builder.startObject();
fieldMap.put(AlertActionManager.TRIGGERED_FIELD, true); builder.field(AlertActionManager.ALERT_NAME_FIELD, "testName");
fieldMap.put(AlertActionManager.FIRE_TIME_FIELD, fireTime.toDateTimeISO().toString()); builder.field(AlertActionManager.TRIGGERED_FIELD, true);
fieldMap.put(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledFireTime.toDateTimeISO().toString()); builder.field(AlertActionManager.FIRE_TIME_FIELD, formatter.printer().print(fireTime));
fieldMap.put(AlertActionManager.TRIGGER_FIELD, triggerMap); builder.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, formatter.printer().print(scheduledFireTime));
fieldMap.put(AlertActionManager.QUERY_RAN_FIELD, "foobar"); builder.field(AlertActionManager.TRIGGER_FIELD, triggerMap);
fieldMap.put(AlertActionManager.NUMBER_OF_RESULTS_FIELD,10); builder.field(AlertActionManager.QUERY_RAN_FIELD, "foobar");
fieldMap.put(AlertActionManager.ACTIONS_FIELD, actionMap); builder.field(AlertActionManager.NUMBER_OF_RESULTS_FIELD, 10);
fieldMap.put(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString()); builder.field(AlertActionManager.ACTIONS_FIELD, actionMap);
builder.field(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString());
builder.endObject();
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName()); AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", fieldMap, 0, alertActionRegistry, logger); AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry, logger);
assertEquals(actionEntry.getVersion(), 0); assertEquals(actionEntry.getVersion(), 0);
assertEquals(actionEntry.getAlertName(), "testName"); assertEquals(actionEntry.getAlertName(), "testName");