Merge branch 'master' of https://github.com/elasticsearch/elasticsearch-alerts
Original commit: elastic/x-pack-elasticsearch@bf4f4669f1
This commit is contained in:
commit
aa6a51306c
|
@ -7,20 +7,12 @@ package org.elasticsearch.alerts;
|
|||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.alerts.actions.AlertAction;
|
||||
import org.elasticsearch.alerts.actions.AlertActionRegistry;
|
||||
import org.elasticsearch.alerts.triggers.AlertTrigger;
|
||||
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.joda.time.DateTimeZone;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class Alert implements ToXContent {
|
||||
|
@ -52,11 +44,10 @@ public class Alert implements ToXContent {
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
searchRequest.writeTo(new DataOutputStreamOutput(new DataOutputStream(out)));
|
||||
builder.field(AlertsStore.REQUEST_BINARY_FIELD.getPreferredName(), out.toByteArray());
|
||||
builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule);
|
||||
builder.field(AlertsStore.ENABLE.getPreferredName(), enabled);
|
||||
builder.field(AlertsStore.REQUEST_FIELD.getPreferredName());
|
||||
AlertUtils.writeSearchRequest(searchRequest, builder);
|
||||
if (lastActionFire != null) {
|
||||
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.alerts;
|
||||
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public final class AlertUtils {
|
||||
|
||||
private AlertUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a new search request instance for the specified parser.
|
||||
*/
|
||||
public static SearchRequest readSearchRequest(XContentParser parser) throws IOException {
|
||||
String searchRequestFieldName = null;
|
||||
XContentParser.Token token;
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); // TODO: make options configurable
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
searchRequestFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
switch (searchRequestFieldName) {
|
||||
case "indices":
|
||||
List<String> indices = new ArrayList<>();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
indices.add(parser.textOrNull());
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
|
||||
}
|
||||
}
|
||||
searchRequest.indices(indices.toArray(new String[indices.size()]));
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
switch (searchRequestFieldName) {
|
||||
case "body":
|
||||
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
|
||||
builder.copyCurrentStructure(parser);
|
||||
searchRequest.source(builder);
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
} else if (token.isValue()) {
|
||||
switch (searchRequestFieldName) {
|
||||
case "template_name":
|
||||
searchRequest.templateName(parser.textOrNull());
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
}
|
||||
return searchRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the searchRequest to the specified builder.
|
||||
*/
|
||||
public static void writeSearchRequest(SearchRequest searchRequest, XContentBuilder builder) throws IOException {
|
||||
builder.startObject();
|
||||
if (Strings.hasLength(searchRequest.source())) {
|
||||
XContentHelper.writeRawField("body", searchRequest.source(), builder, ToXContent.EMPTY_PARAMS);
|
||||
}
|
||||
if (searchRequest.templateName() != null) {
|
||||
builder.field("template_name", searchRequest.templateName());
|
||||
}
|
||||
builder.startArray("indices");
|
||||
for (String index : searchRequest.indices()) {
|
||||
builder.value(index);
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
}
|
|
@ -10,10 +10,8 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
|||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.alerts.actions.AlertAction;
|
||||
import org.elasticsearch.alerts.actions.AlertActionRegistry;
|
||||
import org.elasticsearch.alerts.triggers.TriggerManager;
|
||||
|
@ -25,20 +23,17 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -57,7 +52,6 @@ public class AlertsStore extends AbstractComponent {
|
|||
public static final ParseField ACTION_FIELD = new ParseField("actions");
|
||||
public static final ParseField LAST_ACTION_FIRE = new ParseField("last_action_fire");
|
||||
public static final ParseField ENABLE = new ParseField("enable");
|
||||
public static final ParseField REQUEST_BINARY_FIELD = new ParseField("request_binary");
|
||||
public static final ParseField REQUEST_FIELD = new ParseField("request");
|
||||
|
||||
private final Client client;
|
||||
|
@ -265,51 +259,7 @@ public class AlertsStore extends AbstractComponent {
|
|||
List<AlertAction> actions = alertActionRegistry.instantiateAlertActions(parser);
|
||||
alert.actions(actions);
|
||||
} else if (REQUEST_FIELD.match(currentFieldName)) {
|
||||
String searchRequestFieldName = null;
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); // TODO: make options configurable
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
searchRequestFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
switch (searchRequestFieldName) {
|
||||
case "indices":
|
||||
List<String> indices = new ArrayList<>();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
indices.add(parser.textOrNull());
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
|
||||
}
|
||||
}
|
||||
searchRequest.indices(indices.toArray(new String[indices.size()]));
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
switch (searchRequestFieldName) {
|
||||
case "body":
|
||||
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
|
||||
builder.copyCurrentStructure(parser);
|
||||
searchRequest.source(builder);
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
} else if (token.isValue()) {
|
||||
switch (searchRequestFieldName) {
|
||||
case "template_name":
|
||||
searchRequest.templateName(parser.textOrNull());
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
}
|
||||
alert.setSearchRequest(searchRequest);
|
||||
alert.setSearchRequest(AlertUtils.readSearchRequest(parser));
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
|
||||
}
|
||||
|
@ -320,10 +270,6 @@ public class AlertsStore extends AbstractComponent {
|
|||
alert.enabled(parser.booleanValue());
|
||||
} else if (LAST_ACTION_FIRE.match(currentFieldName)) {
|
||||
alert.lastActionFire(DateTime.parse(parser.textOrNull()));
|
||||
} else if (REQUEST_BINARY_FIELD.match(currentFieldName)) {
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.readFrom(new BytesStreamInput(parser.binaryValue(), false));
|
||||
alert.setSearchRequest(searchRequest);
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.alerts.actions;
|
|||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.alerts.Alert;
|
||||
import org.elasticsearch.alerts.AlertUtils;
|
||||
import org.elasticsearch.alerts.triggers.AlertTrigger;
|
||||
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
|
@ -192,14 +193,12 @@ public class AlertActionEntry implements ToXContent{
|
|||
historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO());
|
||||
historyEntry.field("trigger", trigger, params);
|
||||
|
||||
ByteArrayOutputStream out;
|
||||
if (searchRequest != null) {
|
||||
out = new ByteArrayOutputStream();
|
||||
searchRequest.writeTo(new DataOutputStreamOutput(new DataOutputStream(out)));
|
||||
historyEntry.field("request_binary", out.toByteArray());
|
||||
historyEntry.field("request");
|
||||
AlertUtils.writeSearchRequest(searchRequest, historyEntry);
|
||||
}
|
||||
if (searchResponse != null) {
|
||||
out = new ByteArrayOutputStream();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
searchResponse.writeTo(new DataOutputStreamOutput(new DataOutputStream(out)));
|
||||
historyEntry.field("response_binary", out.toByteArray());
|
||||
// Serializing it as xcontent allows the search response to be encapsulated in a doc as a json object
|
||||
|
|
|
@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.alerts.*;
|
||||
|
@ -48,7 +47,7 @@ public class AlertActionManager extends AbstractComponent {
|
|||
public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduled_fire_time";
|
||||
public static final String ERROR_MESSAGE = "errorMsg";
|
||||
public static final String TRIGGER_FIELD = "trigger";
|
||||
public static final String REQUEST = "request_binary";
|
||||
public static final String REQUEST = "request";
|
||||
public static final String RESPONSE = "response_binary";
|
||||
public static final String ACTIONS_FIELD = "actions";
|
||||
|
||||
|
@ -199,6 +198,9 @@ public class AlertActionManager extends AbstractComponent {
|
|||
case TRIGGER_FIELD:
|
||||
entry.setTrigger(triggerManager.instantiateAlertTrigger(parser));
|
||||
break;
|
||||
case REQUEST:
|
||||
entry.setSearchRequest(AlertUtils.readSearchRequest(parser));
|
||||
break;
|
||||
case "response":
|
||||
// Ignore this, the binary form is already read
|
||||
parser.skipChildren();
|
||||
|
@ -221,11 +223,6 @@ public class AlertActionManager extends AbstractComponent {
|
|||
case SCHEDULED_FIRE_TIME_FIELD:
|
||||
entry.setScheduledTime(DateTime.parse(parser.text()));
|
||||
break;
|
||||
case REQUEST:
|
||||
SearchRequest request = new SearchRequest();
|
||||
request.readFrom(new BytesStreamInput(parser.binaryValue(), false));
|
||||
entry.setSearchRequest(request);
|
||||
break;
|
||||
case RESPONSE:
|
||||
SearchResponse response = new SearchResponse();
|
||||
response.readFrom(new BytesStreamInput(parser.binaryValue(), false));
|
||||
|
|
|
@ -30,8 +30,9 @@
|
|||
"errorMsg": {
|
||||
"type": "string"
|
||||
},
|
||||
"request_binary" : {
|
||||
"type" : "binary"
|
||||
"request" : {
|
||||
"type" : "object",
|
||||
"dynamic" : true
|
||||
},
|
||||
"response_binary" : {
|
||||
"type" : "binary"
|
||||
|
|
|
@ -17,14 +17,11 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.TestCluster;
|
||||
|
@ -78,19 +75,8 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
|||
builder.field("schedule", cron);
|
||||
builder.field("enable", true);
|
||||
|
||||
builder.startObject("request");
|
||||
if (Strings.hasLength(request.source())) {
|
||||
XContentHelper.writeRawField("body", request.source(), builder, ToXContent.EMPTY_PARAMS);
|
||||
}
|
||||
if (request.templateName() != null) {
|
||||
builder.field("template_name", request.templateName());
|
||||
}
|
||||
builder.startArray("indices");
|
||||
for (String index : request.indices()) {
|
||||
builder.value(index);
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
builder.field("request");
|
||||
AlertUtils.writeSearchRequest(request, builder);
|
||||
|
||||
builder.startObject("trigger");
|
||||
builder.startObject("script");
|
||||
|
|
|
@ -11,50 +11,45 @@ import org.elasticsearch.action.search.SearchResponse;
|
|||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.alerts.Alert;
|
||||
import org.elasticsearch.alerts.AlertManager;
|
||||
import org.elasticsearch.alerts.AlertUtils;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
import org.elasticsearch.alerts.client.AlertsClient;
|
||||
import org.elasticsearch.alerts.client.AlertsClientInterface;
|
||||
import org.elasticsearch.alerts.plugin.AlertsPlugin;
|
||||
import org.elasticsearch.alerts.transport.actions.index.IndexAlertRequest;
|
||||
import org.elasticsearch.alerts.transport.actions.index.IndexAlertResponse;
|
||||
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
|
||||
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
|
||||
import org.elasticsearch.alerts.transport.actions.get.GetAlertRequest;
|
||||
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
|
||||
import org.elasticsearch.alerts.transport.actions.index.IndexAlertRequest;
|
||||
import org.elasticsearch.alerts.transport.actions.index.IndexAlertResponse;
|
||||
import org.elasticsearch.alerts.triggers.AlertTrigger;
|
||||
import org.elasticsearch.alerts.triggers.ScriptedTrigger;
|
||||
import org.elasticsearch.alerts.triggers.TriggerResult;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.joda.time.DateTimeZone;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.mapper.core.DateFieldMapper;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHits;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -107,7 +102,8 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
|
|||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
SearchRequest searchRequest = new SearchRequest("test123");
|
||||
searchRequest.writeTo(out);
|
||||
builder.field(AlertActionManager.REQUEST, out.bytes());
|
||||
builder.field(AlertActionManager.REQUEST);
|
||||
AlertUtils.writeSearchRequest(searchRequest, builder);
|
||||
SearchResponse searchResponse = new SearchResponse(
|
||||
new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[0], 10, 0), null, null, null, false, false),
|
||||
null, 1, 1, 0, new ShardSearchFailure[0]
|
||||
|
|
Loading…
Reference in New Issue