Dropped the binary request and response fields and rely fully on the response and request object fields.

Closes elastic/elasticsearch#36

Original commit: elastic/x-pack-elasticsearch@51370ac47d
This commit is contained in:
Martijn van Groningen 2014-11-19 12:01:55 +01:00
parent 7f177281ae
commit a6089ce8ac
14 changed files with 66 additions and 94 deletions

View File

@ -144,6 +144,11 @@ public final class AlertUtils {
* Writes the searchRequest to the specified builder.
*/
public static void writeSearchRequest(SearchRequest searchRequest, XContentBuilder builder, ToXContent.Params params) throws IOException {
if (searchRequest == null) {
builder.nullValue();
return;
}
builder.startObject();
if (Strings.hasLength(searchRequest.source())) {
XContentHelper.writeRawField("body", searchRequest.source(), builder, params);

View File

@ -6,24 +6,21 @@
package org.elasticsearch.alerts.actions;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertUtils;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* An alert action entry is an event of an alert that fired on particular moment in time.
*/
public class AlertActionEntry implements ToXContent{
public class AlertActionEntry implements ToXContent {
private String id;
private long version;
@ -37,7 +34,7 @@ public class AlertActionEntry implements ToXContent{
/*Optional*/
private SearchRequest searchRequest;
private SearchResponse searchResponse;
private Map<String, Object> searchResponse;
private boolean triggered;
private String errorMsg;
@ -135,11 +132,11 @@ public class AlertActionEntry implements ToXContent{
/**
* @return The search response that resulted at out the search request that ran.
*/
public SearchResponse getSearchResponse() {
public Map<String, Object> getSearchResponse() {
return searchResponse;
}
public void setSearchResponse(SearchResponse searchResponse) {
public void setSearchResponse(Map<String, Object> searchResponse) {
this.searchResponse = searchResponse;
}
@ -191,24 +188,10 @@ public class AlertActionEntry implements ToXContent{
historyEntry.field("triggered", triggered);
historyEntry.field("fire_time", fireTime.toDateTimeISO());
historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO());
historyEntry.field("trigger");
historyEntry.startObject();
historyEntry.field(trigger.getTriggerName(), trigger, params);
historyEntry.endObject();
if (searchRequest != null) {
historyEntry.field("request");
AlertUtils.writeSearchRequest(searchRequest, historyEntry, params);
}
if (searchResponse != null) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
searchResponse.writeTo(new DataOutputStreamOutput(new DataOutputStream(out)));
historyEntry.field("response_binary", out.toByteArray());
// Serializing it as xcontent allows the search response to be encapsulated in a doc as a json object
historyEntry.startObject("response");
searchResponse.toXContent(historyEntry, params);
historyEntry.endObject();
}
historyEntry.field("trigger", trigger, params);
historyEntry.field("request");
AlertUtils.writeSearchRequest(searchRequest, historyEntry, params);
historyEntry.field("response", searchResponse);
historyEntry.startObject("actions");
for (AlertAction action : actions) {

View File

@ -22,7 +22,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -50,7 +49,7 @@ public class AlertActionManager extends AbstractComponent {
public static final String ERROR_MESSAGE = "error_msg";
public static final String TRIGGER_FIELD = "trigger";
public static final String REQUEST = "request";
public static final String RESPONSE = "response_binary";
public static final String RESPONSE = "response";
public static final String ACTIONS_FIELD = "actions";
public static final String ALERT_HISTORY_INDEX = ".alert_history";
@ -211,9 +210,8 @@ public class AlertActionManager extends AbstractComponent {
case REQUEST:
entry.setSearchRequest(AlertUtils.readSearchRequest(parser));
break;
case "response":
// Ignore this, the binary form is already read
parser.skipChildren();
case RESPONSE:
entry.setSearchResponse(parser.map());
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
@ -232,11 +230,6 @@ public class AlertActionManager extends AbstractComponent {
case SCHEDULED_FIRE_TIME_FIELD:
entry.setScheduledTime(DateTime.parse(parser.text()));
break;
case RESPONSE:
SearchResponse response = new SearchResponse();
response.readFrom(new BytesStreamInput(parser.binaryValue(), false));
entry.setSearchResponse(response);
break;
case ERROR_MESSAGE:
entry.setErrorMsg(parser.textOrNull());
break;

View File

@ -11,8 +11,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import javax.mail.*;
import javax.mail.internet.InternetAddress;
@ -20,6 +19,7 @@ import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class EmailAlertActionFactory implements AlertActionFactory {
@ -92,7 +92,8 @@ public class EmailAlertActionFactory implements AlertActionFactory {
message.setSubject("Elasticsearch Alert " + alert.alertName() + " triggered");
StringBuilder output = new StringBuilder();
output.append("The following query triggered because ").append(result.getTrigger().toString()).append("\n");
output.append("The total number of hits returned : ").append(result.getResponse().getHits().getTotalHits()).append("\n");
Object totalHits = XContentMapValues.extractValue("hits.total", result.getResponse());
output.append("The total number of hits returned : ").append(totalHits).append("\n");
output.append("For query : ").append(result.getRequest());
output.append("\n");
output.append("Indices : ");
@ -104,11 +105,13 @@ public class EmailAlertActionFactory implements AlertActionFactory {
output.append("\n");
if (emailAlertAction.getDisplayField() != null) {
for (SearchHit sh : result.getResponse().getHits().getHits()) {
if (sh.sourceAsMap().containsKey(emailAlertAction.getDisplayField())) {
output.append(sh.sourceAsMap().get(emailAlertAction.getDisplayField()).toString());
List<Map<String, Object>> hits = (List<Map<String, Object>>) XContentMapValues.extractValue("hits.hits", result.getResponse());
for (Map<String, Object> hit : hits) {
Map<String, Object> _source = (Map<String, Object>) hit.get("_source");
if (_source.containsKey(emailAlertAction.getDisplayField())) {
output.append(_source.get(emailAlertAction.getDisplayField()).toString());
} else {
output.append(new String(sh.source()));
output.append(_source);
}
output.append("\n");
}

View File

@ -12,10 +12,10 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
/**
@ -70,7 +70,7 @@ public class IndexAlertActionFactory implements AlertActionFactory {
try {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject();
resultBuilder = result.getResponse().toXContent(resultBuilder, ToXContent.EMPTY_PARAMS);
resultBuilder.field("response", result.getResponse());
resultBuilder.field("timestamp", alert.lastActionFire()); ///@TODO FIXME the firetime should be in the result ?
resultBuilder.endObject();
indexRequest.source(resultBuilder);

View File

@ -10,7 +10,6 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
@ -70,35 +69,22 @@ public class ScriptedTriggerFactory implements TriggerFactory {
}
@Override
public boolean isTriggered(AlertTrigger trigger, SearchRequest request, SearchResponse response) {
public boolean isTriggered(AlertTrigger trigger, SearchRequest request, Map<String, Object> response) {
if (! (trigger instanceof ScriptedTrigger) ){
throw new ElasticsearchIllegalStateException("Failed to evaluate isTriggered expected type ["
+ ScriptedTrigger.class + "] got [" + trigger.getClass() + "]");
}
ScriptedTrigger scriptedTrigger = (ScriptedTrigger)trigger;
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
Map<String, Object> responseMap = XContentHelper.convertToMap(builder.bytes(), false).v2();
ExecutableScript executable = scriptService.executable(
scriptedTrigger.getScriptLang(), scriptedTrigger.getScript(), scriptedTrigger.getScriptType(), responseMap
);
Object returnValue = executable.run();
if (returnValue instanceof Boolean) {
return (Boolean) returnValue;
} else {
throw new ElasticsearchIllegalStateException("Trigger script [" + scriptedTrigger + "] did not return a Boolean");
}
} catch (IOException e) {
throw new ElasticsearchException("Failed to execute trigger", e);
ExecutableScript executable = scriptService.executable(
scriptedTrigger.getScriptLang(), scriptedTrigger.getScript(), scriptedTrigger.getScriptType(), response
);
Object returnValue = executable.run();
if (returnValue instanceof Boolean) {
return (Boolean) returnValue;
} else {
throw new ElasticsearchIllegalStateException("Trigger script [" + scriptedTrigger + "] did not return a Boolean");
}
}
}

View File

@ -7,10 +7,10 @@ package org.elasticsearch.alerts.triggers;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Map;
public interface TriggerFactory {
@ -30,6 +30,6 @@ public interface TriggerFactory {
* @param response
* @return
*/
boolean isTriggered(AlertTrigger trigger, SearchRequest request, SearchResponse response);
boolean isTriggered(AlertTrigger trigger, SearchRequest request, Map<String, Object> response);
}

View File

@ -20,8 +20,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
@ -31,6 +30,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class TriggerManager extends AbstractComponent {
private static final String FIRE_TIME_VARIABLE_NAME = "FIRE_TIME";
@ -111,10 +112,12 @@ public class TriggerManager extends AbstractComponent {
}
}
return isTriggered(alert.trigger(), request, response);
XContentBuilder builder = jsonBuilder().startObject().value(response).endObject();
Map<String, Object> responseMap = XContentHelper.convertToMap(builder.bytes(), false).v2();
return isTriggered(alert.trigger(), request, responseMap);
}
protected TriggerResult isTriggered(AlertTrigger trigger, SearchRequest request, SearchResponse response) {
protected TriggerResult isTriggered(AlertTrigger trigger, SearchRequest request, Map<String, Object> response) {
TriggerFactory factory = triggersImplemented.get(trigger.getTriggerName());
if (factory == null) {

View File

@ -6,7 +6,8 @@
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import java.util.Map;
/**
*/
@ -14,10 +15,10 @@ public class TriggerResult {
private final boolean triggered;
private final SearchRequest request;
private final SearchResponse response;
private final Map<String, Object> response;
private final AlertTrigger trigger;
public TriggerResult(boolean triggered, SearchRequest request, SearchResponse response, AlertTrigger trigger) {
public TriggerResult(boolean triggered, SearchRequest request, Map<String, Object> response, AlertTrigger trigger) {
this.triggered = triggered;
this.request = request;
this.response = response;
@ -32,7 +33,7 @@ public class TriggerResult {
return request;
}
public SearchResponse getResponse() {
public Map<String, Object> getResponse() {
return response;
}

View File

@ -34,9 +34,6 @@
"type" : "object",
"dynamic" : true
},
"response_binary" : {
"type" : "binary"
},
"response" : {
"type" : "object",
"dynamic" : true

View File

@ -32,9 +32,6 @@
"enabled" : false,
"dynamic" : true
},
"request_binary": {
"type" : "binary"
},
"request": {
"type" : "object",
"enabled" : false,

View File

@ -55,7 +55,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
createIndex("my-index");
// Have a sample document in the index, the alert is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
SearchRequest searchRequest = new SearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1");
alertsClient.prepareIndexAlert("my-first-alert")
.setAlertSource(alertSource)

View File

@ -24,7 +24,6 @@ import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.ScriptedTrigger;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
@ -34,6 +33,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.ScriptService;
@ -98,18 +98,16 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
builder.field(AlertActionManager.FIRE_TIME_FIELD, formatter.printer().print(fireTime));
builder.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, formatter.printer().print(scheduledFireTime));
builder.field(AlertActionManager.TRIGGER_FIELD, triggerMap);
BytesStreamOutput out = new BytesStreamOutput();
SearchRequest searchRequest = new SearchRequest("test123");
searchRequest.writeTo(out);
builder.field(AlertActionManager.REQUEST);
AlertUtils.writeSearchRequest(searchRequest, builder, ToXContent.EMPTY_PARAMS);
SearchResponse searchResponse = new SearchResponse(
new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[0], 10, 0), null, null, null, false, false),
null, 1, 1, 0, new ShardSearchFailure[0]
);
out = new BytesStreamOutput();
searchResponse.writeTo(out);
builder.field(AlertActionManager.RESPONSE, out.bytes());
builder.startObject(AlertActionManager.RESPONSE);
builder.value(searchResponse);
builder.endObject();
builder.field(AlertActionManager.ACTIONS_FIELD, actionMap);
builder.field(AlertActionState.FIELD_NAME, AlertActionState.SEARCH_NEEDED.toString());
builder.endObject();
@ -123,7 +121,7 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
assertEquals(actionEntry.getScheduledTime(), scheduledFireTime);
assertEquals(actionEntry.getFireTime(), fireTime);
assertEquals(actionEntry.getEntryState(), AlertActionState.SEARCH_NEEDED);
assertEquals(actionEntry.getSearchResponse().getHits().getTotalHits(), 10);
assertEquals(XContentMapValues.extractValue("hits.total", actionEntry.getSearchResponse()), 10);
}
@Test

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.Environment;
import org.elasticsearch.script.ScriptEngineService;
@ -25,8 +26,11 @@ import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class TriggerUnitTest extends ElasticsearchTestCase {
@ -69,14 +73,16 @@ public class TriggerUnitTest extends ElasticsearchTestCase {
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500l, new ShardSearchFailure[0]);
assertFalse(triggerManager.isTriggered(trigger, request, response).isTriggered());
XContentBuilder responseBuilder = jsonBuilder().startObject().value(response).endObject();
Map<String, Object> responseMap = XContentHelper.convertToMap(responseBuilder.bytes(), false).v2();
assertFalse(triggerManager.isTriggered(trigger, request, responseMap).isTriggered());
builder = createTriggerContent("return true", null, null);
parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
trigger = triggerManager.instantiateAlertTrigger(parser);
assertTrue(triggerManager.isTriggered(trigger, request, response).isTriggered());
assertTrue(triggerManager.isTriggered(trigger, request, responseMap).isTriggered());
tp.shutdownNow();