Core: Better infra for reading/writing search request in alert

Original commit: elastic/x-pack-elasticsearch@febd43058b
This commit is contained in:
Martijn van Groningen 2014-11-13 19:14:36 +01:00
parent 6a5a1710d0
commit eb3f123ad6
4 changed files with 104 additions and 82 deletions

View File

@ -7,20 +7,12 @@ package org.elasticsearch.alerts;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Alert implements ToXContent {
@ -52,11 +44,10 @@ public class Alert implements ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
ByteArrayOutputStream out = new ByteArrayOutputStream();
searchRequest.writeTo(new DataOutputStreamOutput(new DataOutputStream(out)));
builder.field(AlertsStore.REQUEST_BINARY_FIELD.getPreferredName(), out.toByteArray());
builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule);
builder.field(AlertsStore.ENABLE.getPreferredName(), enabled);
builder.field(AlertsStore.REQUEST_FIELD.getPreferredName());
AlertUtils.writeSearchRequest(searchRequest, builder);
if (lastActionFire != null) {
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
}

View File

@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*/
public final class AlertUtils {
private AlertUtils() {
}
/**
* Reads a new search request instance for the specified parser.
*/
public static SearchRequest readSearchRequest(XContentParser parser) throws IOException {
String searchRequestFieldName = null;
XContentParser.Token token;
SearchRequest searchRequest = new SearchRequest();
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); // TODO: make options configurable
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
searchRequestFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
switch (searchRequestFieldName) {
case "indices":
List<String> indices = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
indices.add(parser.textOrNull());
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
}
}
searchRequest.indices(indices.toArray(new String[indices.size()]));
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
switch (searchRequestFieldName) {
case "body":
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
builder.copyCurrentStructure(parser);
searchRequest.source(builder);
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
} else if (token.isValue()) {
switch (searchRequestFieldName) {
case "template_name":
searchRequest.templateName(parser.textOrNull());
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
}
return searchRequest;
}
/**
* Writes the searchRequest to the specified builder.
*/
public static void writeSearchRequest(SearchRequest searchRequest, XContentBuilder builder) throws IOException {
builder.startObject();
if (Strings.hasLength(searchRequest.source())) {
XContentHelper.writeRawField("body", searchRequest.source(), builder, ToXContent.EMPTY_PARAMS);
}
if (searchRequest.templateName() != null) {
builder.field("template_name", searchRequest.templateName());
}
builder.startArray("indices");
for (String index : searchRequest.indices()) {
builder.value(index);
}
builder.endArray();
builder.endObject();
}
}

View File

@ -10,10 +10,8 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.triggers.TriggerManager;
@ -25,20 +23,17 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@ -57,7 +52,6 @@ public class AlertsStore extends AbstractComponent {
public static final ParseField ACTION_FIELD = new ParseField("actions");
public static final ParseField LAST_ACTION_FIRE = new ParseField("last_action_fire");
public static final ParseField ENABLE = new ParseField("enable");
public static final ParseField REQUEST_BINARY_FIELD = new ParseField("request_binary");
public static final ParseField REQUEST_FIELD = new ParseField("request");
private final Client client;
@ -265,51 +259,7 @@ public class AlertsStore extends AbstractComponent {
List<AlertAction> actions = alertActionRegistry.instantiateAlertActions(parser);
alert.actions(actions);
} else if (REQUEST_FIELD.match(currentFieldName)) {
String searchRequestFieldName = null;
SearchRequest searchRequest = new SearchRequest();
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); // TODO: make options configurable
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
searchRequestFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
switch (searchRequestFieldName) {
case "indices":
List<String> indices = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
indices.add(parser.textOrNull());
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
}
}
searchRequest.indices(indices.toArray(new String[indices.size()]));
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
switch (searchRequestFieldName) {
case "body":
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
builder.copyCurrentStructure(parser);
searchRequest.source(builder);
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
} else if (token.isValue()) {
switch (searchRequestFieldName) {
case "template_name":
searchRequest.templateName(parser.textOrNull());
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
}
alert.setSearchRequest(searchRequest);
alert.setSearchRequest(AlertUtils.readSearchRequest(parser));
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
@ -320,10 +270,6 @@ public class AlertsStore extends AbstractComponent {
alert.enabled(parser.booleanValue());
} else if (LAST_ACTION_FIRE.match(currentFieldName)) {
alert.lastActionFire(DateTime.parse(parser.textOrNull()));
} else if (REQUEST_BINARY_FIELD.match(currentFieldName)) {
SearchRequest searchRequest = new SearchRequest();
searchRequest.readFrom(new BytesStreamInput(parser.binaryValue(), false));
alert.setSearchRequest(searchRequest);
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}

View File

@ -17,14 +17,11 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
@ -78,19 +75,8 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
builder.field("schedule", cron);
builder.field("enable", true);
builder.startObject("request");
if (Strings.hasLength(request.source())) {
XContentHelper.writeRawField("body", request.source(), builder, ToXContent.EMPTY_PARAMS);
}
if (request.templateName() != null) {
builder.field("template_name", request.templateName());
}
builder.startArray("indices");
for (String index : request.indices()) {
builder.value(index);
}
builder.endArray();
builder.endObject();
builder.field("request");
AlertUtils.writeSearchRequest(request, builder);
builder.startObject("trigger");
builder.startObject("script");