diff --git a/src/main/java/org/elasticsearch/alerts/Alert.java b/src/main/java/org/elasticsearch/alerts/Alert.java index 8bc4e586a97..c2cbc3e4e7a 100644 --- a/src/main/java/org/elasticsearch/alerts/Alert.java +++ b/src/main/java/org/elasticsearch/alerts/Alert.java @@ -7,20 +7,12 @@ package org.elasticsearch.alerts; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.alerts.actions.AlertAction; -import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.triggers.AlertTrigger; -import org.elasticsearch.common.io.stream.DataOutputStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.joda.time.DateTime; -import org.elasticsearch.common.joda.time.DateTimeZone; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.List; public class Alert implements ToXContent { @@ -52,11 +44,10 @@ public class Alert implements ToXContent { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - searchRequest.writeTo(new DataOutputStreamOutput(new DataOutputStream(out))); - builder.field(AlertsStore.REQUEST_BINARY_FIELD.getPreferredName(), out.toByteArray()); builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule); builder.field(AlertsStore.ENABLE.getPreferredName(), enabled); + builder.field(AlertsStore.REQUEST_FIELD.getPreferredName()); + AlertUtils.writeSearchRequest(searchRequest, builder); if (lastActionFire != null) { builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire); } diff --git a/src/main/java/org/elasticsearch/alerts/AlertUtils.java b/src/main/java/org/elasticsearch/alerts/AlertUtils.java new file mode 100644 index 00000000000..3d4c486440c --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/AlertUtils.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + */ +public final class AlertUtils { + + private AlertUtils() { + } + + /** + * Reads a new search request instance for the specified parser. + */ + public static SearchRequest readSearchRequest(XContentParser parser) throws IOException { + String searchRequestFieldName = null; + XContentParser.Token token; + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); // TODO: make options configurable + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + searchRequestFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_ARRAY) { + switch (searchRequestFieldName) { + case "indices": + List indices = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.VALUE_STRING) { + indices.add(parser.textOrNull()); + } else { + throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]"); + } + } + searchRequest.indices(indices.toArray(new String[indices.size()])); + break; + default: + throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]"); + } + } else if (token == XContentParser.Token.START_OBJECT) { + switch (searchRequestFieldName) { + case "body": + XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent()); + builder.copyCurrentStructure(parser); + searchRequest.source(builder); + break; + default: + throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]"); + } + } else if (token.isValue()) { + switch (searchRequestFieldName) { + case "template_name": + searchRequest.templateName(parser.textOrNull()); + break; + default: + throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]"); + } + } else { + throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]"); + } + } + return searchRequest; + } + + /** + * Writes the searchRequest to the specified builder. + */ + public static void writeSearchRequest(SearchRequest searchRequest, XContentBuilder builder) throws IOException { + builder.startObject(); + if (Strings.hasLength(searchRequest.source())) { + XContentHelper.writeRawField("body", searchRequest.source(), builder, ToXContent.EMPTY_PARAMS); + } + if (searchRequest.templateName() != null) { + builder.field("template_name", searchRequest.templateName()); + } + builder.startArray("indices"); + for (String index : searchRequest.indices()) { + builder.value(index); + } + builder.endArray(); + builder.endObject(); + } + +} diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 1aaf627dcd2..4433dde3b81 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -10,10 +10,8 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.triggers.TriggerManager; @@ -25,20 +23,17 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; @@ -57,7 +52,6 @@ public class AlertsStore extends AbstractComponent { public static final ParseField ACTION_FIELD = new ParseField("actions"); public static final ParseField LAST_ACTION_FIRE = new ParseField("last_action_fire"); public static final ParseField ENABLE = new ParseField("enable"); - public static final ParseField REQUEST_BINARY_FIELD = new ParseField("request_binary"); public static final ParseField REQUEST_FIELD = new ParseField("request"); private final Client client; @@ -265,51 +259,7 @@ public class AlertsStore extends AbstractComponent { List actions = alertActionRegistry.instantiateAlertActions(parser); alert.actions(actions); } else if (REQUEST_FIELD.match(currentFieldName)) { - String searchRequestFieldName = null; - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); // TODO: make options configurable - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - searchRequestFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_ARRAY) { - switch (searchRequestFieldName) { - case "indices": - List indices = new ArrayList<>(); - while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { - if (token == XContentParser.Token.VALUE_STRING) { - indices.add(parser.textOrNull()); - } else { - throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]"); - } - } - searchRequest.indices(indices.toArray(new String[indices.size()])); - break; - default: - throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]"); - } - } else if (token == XContentParser.Token.START_OBJECT) { - switch (searchRequestFieldName) { - case "body": - XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent()); - builder.copyCurrentStructure(parser); - searchRequest.source(builder); - break; - default: - throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]"); - } - } else if (token.isValue()) { - switch (searchRequestFieldName) { - case "template_name": - searchRequest.templateName(parser.textOrNull()); - break; - default: - throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]"); - } - } else { - throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]"); - } - } - alert.setSearchRequest(searchRequest); + alert.setSearchRequest(AlertUtils.readSearchRequest(parser)); } else { throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } @@ -320,10 +270,6 @@ public class AlertsStore extends AbstractComponent { alert.enabled(parser.booleanValue()); } else if (LAST_ACTION_FIRE.match(currentFieldName)) { alert.lastActionFire(DateTime.parse(parser.textOrNull())); - } else if (REQUEST_BINARY_FIELD.match(currentFieldName)) { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.readFrom(new BytesStreamInput(parser.binaryValue(), false)); - alert.setSearchRequest(searchRequest); } else { throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java index 9d23a20a8f1..adb90364482 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java @@ -8,6 +8,7 @@ package org.elasticsearch.alerts.actions; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.AlertUtils; import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.common.io.stream.DataOutputStreamOutput; import org.elasticsearch.common.joda.time.DateTime; @@ -192,14 +193,12 @@ public class AlertActionEntry implements ToXContent{ historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO()); historyEntry.field("trigger", trigger, params); - ByteArrayOutputStream out; if (searchRequest != null) { - out = new ByteArrayOutputStream(); - searchRequest.writeTo(new DataOutputStreamOutput(new DataOutputStream(out))); - historyEntry.field("request_binary", out.toByteArray()); + historyEntry.field("request"); + AlertUtils.writeSearchRequest(searchRequest, historyEntry); } if (searchResponse != null) { - out = new ByteArrayOutputStream(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); searchResponse.writeTo(new DataOutputStreamOutput(new DataOutputStream(out))); historyEntry.field("response_binary", out.toByteArray()); // Serializing it as xcontent allows the search response to be encapsulated in a doc as a json object diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 4445ea66e37..a61e80d0253 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.alerts.*; @@ -48,7 +47,7 @@ public class AlertActionManager extends AbstractComponent { public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduled_fire_time"; public static final String ERROR_MESSAGE = "errorMsg"; public static final String TRIGGER_FIELD = "trigger"; - public static final String REQUEST = "request_binary"; + public static final String REQUEST = "request"; public static final String RESPONSE = "response_binary"; public static final String ACTIONS_FIELD = "actions"; @@ -199,6 +198,9 @@ public class AlertActionManager extends AbstractComponent { case TRIGGER_FIELD: entry.setTrigger(triggerManager.instantiateAlertTrigger(parser)); break; + case REQUEST: + entry.setSearchRequest(AlertUtils.readSearchRequest(parser)); + break; case "response": // Ignore this, the binary form is already read parser.skipChildren(); @@ -221,11 +223,6 @@ public class AlertActionManager extends AbstractComponent { case SCHEDULED_FIRE_TIME_FIELD: entry.setScheduledTime(DateTime.parse(parser.text())); break; - case REQUEST: - SearchRequest request = new SearchRequest(); - request.readFrom(new BytesStreamInput(parser.binaryValue(), false)); - entry.setSearchRequest(request); - break; case RESPONSE: SearchResponse response = new SearchResponse(); response.readFrom(new BytesStreamInput(parser.binaryValue(), false)); diff --git a/src/main/resources/alerthistory.json b/src/main/resources/alerthistory.json index 46f33d1cdb0..7523cca7839 100644 --- a/src/main/resources/alerthistory.json +++ b/src/main/resources/alerthistory.json @@ -30,8 +30,9 @@ "errorMsg": { "type": "string" }, - "request_binary" : { - "type" : "binary" + "request" : { + "type" : "object", + "dynamic" : true }, "response_binary" : { "type" : "binary" diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java index f2d824d4841..41127f1327e 100644 --- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java +++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java @@ -17,14 +17,11 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.TestCluster; @@ -78,19 +75,8 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest builder.field("schedule", cron); builder.field("enable", true); - builder.startObject("request"); - if (Strings.hasLength(request.source())) { - XContentHelper.writeRawField("body", request.source(), builder, ToXContent.EMPTY_PARAMS); - } - if (request.templateName() != null) { - builder.field("template_name", request.templateName()); - } - builder.startArray("indices"); - for (String index : request.indices()) { - builder.value(index); - } - builder.endArray(); - builder.endObject(); + builder.field("request"); + AlertUtils.writeSearchRequest(request, builder); builder.startObject("trigger"); builder.startObject("script"); diff --git a/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java index 0ad7d0bf486..bf454b5f579 100644 --- a/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java +++ b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java @@ -11,50 +11,45 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.AlertManager; +import org.elasticsearch.alerts.AlertUtils; import org.elasticsearch.alerts.AlertsStore; import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.client.AlertsClientInterface; import org.elasticsearch.alerts.plugin.AlertsPlugin; -import org.elasticsearch.alerts.transport.actions.index.IndexAlertRequest; -import org.elasticsearch.alerts.transport.actions.index.IndexAlertResponse; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse; import org.elasticsearch.alerts.transport.actions.get.GetAlertRequest; import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse; +import org.elasticsearch.alerts.transport.actions.index.IndexAlertRequest; +import org.elasticsearch.alerts.transport.actions.index.IndexAlertResponse; import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.ScriptedTrigger; import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.joda.FormatDateTimeFormatter; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTimeZone; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.core.DateFieldMapper; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; import java.io.IOException; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.core.Is.is; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** */ @@ -107,7 +102,8 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest { BytesStreamOutput out = new BytesStreamOutput(); SearchRequest searchRequest = new SearchRequest("test123"); searchRequest.writeTo(out); - builder.field(AlertActionManager.REQUEST, out.bytes()); + builder.field(AlertActionManager.REQUEST); + AlertUtils.writeSearchRequest(searchRequest, builder); SearchResponse searchResponse = new SearchResponse( new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[0], 10, 0), null, null, null, false, false), null, 1, 1, 0, new ShardSearchFailure[0]