Core: Fix use of search templates

Original commit: elastic/x-pack-elasticsearch@401b9b39f2
This commit is contained in:
Martijn van Groningen 2014-11-14 00:00:19 +01:00
parent a78d10da42
commit 8705fd04b9
7 changed files with 138 additions and 88 deletions
src
main/java/org/elasticsearch/alerts
test/java/org/elasticsearch/alerts

@ -47,7 +47,7 @@ public class Alert implements ToXContent {
builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule);
builder.field(AlertsStore.ENABLE.getPreferredName(), enabled);
builder.field(AlertsStore.REQUEST_FIELD.getPreferredName());
AlertUtils.writeSearchRequest(searchRequest, builder);
AlertUtils.writeSearchRequest(searchRequest, builder, params);
if (lastActionFire != null) {
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
}

@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.util.ArrayList;
@ -53,12 +54,18 @@ public final class AlertUtils {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
XContentBuilder builder;
switch (searchRequestFieldName) {
case "body":
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
builder = XContentBuilder.builder(parser.contentType().xContent());
builder.copyCurrentStructure(parser);
searchRequest.source(builder);
break;
case "template_source":
builder = XContentBuilder.builder(parser.contentType().xContent());
builder.copyCurrentStructure(parser);
searchRequest.templateSource(builder.bytes(), false);
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
@ -67,6 +74,9 @@ public final class AlertUtils {
case "template_name":
searchRequest.templateName(parser.textOrNull());
break;
case "template_type":
searchRequest.templateType(readScriptType(parser.textOrNull()));
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
}
@ -80,14 +90,20 @@ public final class AlertUtils {
/**
* Writes the searchRequest to the specified builder.
*/
public static void writeSearchRequest(SearchRequest searchRequest, XContentBuilder builder) throws IOException {
public static void writeSearchRequest(SearchRequest searchRequest, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
if (Strings.hasLength(searchRequest.source())) {
XContentHelper.writeRawField("body", searchRequest.source(), builder, ToXContent.EMPTY_PARAMS);
XContentHelper.writeRawField("body", searchRequest.source(), builder, params);
}
if (searchRequest.templateName() != null) {
builder.field("template_name", searchRequest.templateName());
}
if (searchRequest.templateType() != null) {
builder.field("template_type", writeScriptType(searchRequest.templateType()));
}
if (Strings.hasLength(searchRequest.templateSource())) {
XContentHelper.writeRawField("template_source", searchRequest.templateSource(), builder, params);
}
builder.startArray("indices");
for (String index : searchRequest.indices()) {
builder.value(index);
@ -96,4 +112,30 @@ public final class AlertUtils {
builder.endObject();
}
private static ScriptService.ScriptType readScriptType(String value) {
switch (value) {
case "indexed":
return ScriptService.ScriptType.INDEXED;
case "inline":
return ScriptService.ScriptType.INLINE;
case "file":
return ScriptService.ScriptType.FILE;
default:
throw new ElasticsearchIllegalArgumentException("Unknown script_type value [" + value + "]");
}
}
private static String writeScriptType(ScriptService.ScriptType value) {
switch (value) {
case INDEXED:
return "indexed";
case INLINE:
return "inline";
case FILE:
return "file";
default:
throw new ElasticsearchIllegalArgumentException("Illegal script_type value [" + value + "]");
}
}
}

@ -195,7 +195,7 @@ public class AlertActionEntry implements ToXContent{
if (searchRequest != null) {
historyEntry.field("request");
AlertUtils.writeSearchRequest(searchRequest, historyEntry);
AlertUtils.writeSearchRequest(searchRequest, historyEntry, params);
}
if (searchResponse != null) {
ByteArrayOutputStream out = new ByteArrayOutputStream();

@ -23,20 +23,20 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class TriggerManager extends AbstractComponent {
private static final FormatDateTimeFormatter dateTimeFormatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER;
private volatile ImmutableOpenMap<String, TriggerFactory> triggersImplemented;
private final Client client;
private final String fireTimePlaceHolder;
private final String scheduledFireTimePlaceHolder;
private volatile ImmutableOpenMap<String, TriggerFactory> triggersImplemented;
@Inject
public TriggerManager(Settings settings, Client client, ScriptService scriptService) {
@ -60,11 +60,10 @@ public class TriggerManager extends AbstractComponent {
.build();
}
/**
* Reads the contents of parser to create the correct Trigger
* @param parser The parser containing the trigger definition
* @return
* @return a new AlertTrigger instance from the parser
* @throws IOException
*/
public AlertTrigger instantiateAlertTrigger(XContentParser parser) throws IOException {
@ -103,7 +102,13 @@ public class TriggerManager extends AbstractComponent {
}
SearchResponse response = client.search(request).actionGet(); // actionGet deals properly with InterruptedException
logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits());
if (logger.isDebugEnabled()) {
logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits());
for (SearchHit hit : response.getHits()) {
logger.debug("Hit: {}", XContentHelper.toString(hit));
}
}
return isTriggered(alert.trigger(), request, response);
}
@ -135,26 +140,27 @@ public class TriggerManager extends AbstractComponent {
} else if (Strings.hasLength(alert.getSearchRequest().templateSource())) {
Tuple<XContentType, Map<String, Object>> tuple = XContentHelper.convertToMap(alert.getSearchRequest().templateSource(), false);
Map<String, Object> templateSourceAsMap = tuple.v2();
Map<String, Object> templateObject = (Map<String, Object>) templateSourceAsMap.get("template");
if (templateObject != null) {
Map<String, Object> params = (Map<String, Object>) templateObject.get("params");
params.put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime));
params.put("fire_time", dateTimeFormatter.printer().print(fireTime));
XContentBuilder builder = XContentFactory.contentBuilder(tuple.v1());
builder.map(templateSourceAsMap);
triggerSearchRequest.templateSource(builder.bytes(), false);
@SuppressWarnings("unchecked")
Map<String, Object> params = (Map<String, Object>) templateSourceAsMap.get("params");
if (params == null) {
templateSourceAsMap.put("params", params = new HashMap<>());
}
params.put("SCHEDULED_FIRE_TIME", dateTimeFormatter.printer().print(scheduledFireTime));
params.put("FIRE_TIME", dateTimeFormatter.printer().print(fireTime));
XContentBuilder builder = XContentFactory.contentBuilder(tuple.v1());
builder.map(templateSourceAsMap);
triggerSearchRequest.templateSource(builder.bytes(), false);
} else if (alert.getSearchRequest().templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(alert.getSearchRequest().templateParams())
.put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime))
.put("fire_time", dateTimeFormatter.printer().print(fireTime));
.put("SCHEDULED_FIRE_TIME", dateTimeFormatter.printer().print(scheduledFireTime))
.put("FIRE_TIME", dateTimeFormatter.printer().print(fireTime));
triggerSearchRequest.templateParams(templateParams.map());
triggerSearchRequest.templateName(alert.getSearchRequest().templateName());
triggerSearchRequest.templateType(alert.getSearchRequest().templateType());
} else {
throw new ElasticsearchIllegalStateException("Search requests needs either source, template source or template name");
}
return triggerSearchRequest;
}
}

@ -21,6 +21,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
@ -76,7 +77,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
builder.field("enable", true);
builder.field("request");
AlertUtils.writeSearchRequest(request, builder);
AlertUtils.writeSearchRequest(request, builder, ToXContent.EMPTY_PARAMS);
builder.startObject("trigger");
builder.startObject("script");
@ -133,15 +134,6 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
assertThat(routingTable, notNullValue());
assertThat(routingTable.allPrimaryShardsActive(), is(true));
// SearchResponse k = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
// .setIndicesOptions(IndicesOptions.lenientExpandOpen())
// .setQuery(boolQuery().must(matchQuery("alert_name", alertName)))
// .get();
// System.out.println("KK: " + k.getHits().getTotalHits());
// for (SearchHit hit : k.getHits()) {
// System.out.println("Hit " + XContentHelper.toString(hit));
// }
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.NO_ACTION_NEEDED.toString())))

@ -7,12 +7,12 @@ package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.client.AlertsClientInterface;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
import org.elasticsearch.alerts.transport.actions.index.IndexAlertResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -119,67 +119,77 @@ public class BasicAlertingTest extends AbstractAlertingTests {
}
}
@Test
public void testTriggerSearch() throws Exception {
assertAcked(prepareCreate("my-index").addMapping("my-type", "_timestamp", "enabled=true", "event_type", "type=string"));
private final SearchSourceBuilder searchSourceBuilder = searchSource().query(
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{SCHEDULED_FIRE_TIME}}||-30s").to("{{SCHEDULED_FIRE_TIME}}"))
);
SearchSourceBuilder searchSource = searchSource().query(
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{SCHEDULED_FIRE_TIME}}||-30s").to("{{SCHEDULED_FIRE_TIME}}"))
@Test
public void testTriggerSearchWithSourceSubstituted() throws Exception {
testTriggerSearch(
new SearchRequest("my-index").source(searchSourceBuilder)
);
}
@Test
public void testTriggerSearchWithTemplateSource() throws Exception {
testTriggerSearch(
new SearchRequest("my-index")
.templateSource(jsonBuilder().startObject().field("template").value(searchSourceBuilder).endObject().string())
);
}
@Test
public void testTriggerSearchWithIndexedTemplate() throws Exception {
client().preparePutIndexedScript()
.setScriptLang("mustache")
.setId("my-template")
.setSource(jsonBuilder().startObject().field("template").value(searchSource).endObject())
.setSource(jsonBuilder().startObject().field("template").value(searchSourceBuilder).endObject())
.get();
SearchRequest searchRequest = new SearchRequest("my-index");
searchRequest.templateName("my-template");
searchRequest.templateType(ScriptService.ScriptType.INDEXED);
testTriggerSearch(searchRequest);
}
private void testTriggerSearch(SearchRequest request) throws Exception {
long scheduleTimeInMs = 5000;
String alertName = "red-alert";
assertAcked(prepareCreate("my-index").addMapping("my-type", "_timestamp", "enabled=true", "event_type", "type=string"));
alertClient().prepareDeleteAlert(alertName).get();
alertClient().prepareIndexAlert(alertName)
.setAlertSource(createAlertSource(String.format("0/%s * * * * ? *", (scheduleTimeInMs / 1000)), request, "return hits.total >= 3"))
.get();
String alertName = "red-alert";
long scheduleTimeInMs = 5000;
SearchRequest[] searchRequests = new SearchRequest[]{
new SearchRequest("my-index").source(searchSource)
// client().prepareSearch("my-index").setTemplateName("my-template").request()
// TODO: add template source based search requests
};
long time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
.setCreate(true)
.setSource("event_type", "a")
.get();
client().prepareIndex("my-index", "my-type")
.setCreate(true)
.setSource("event_type", "a")
.get();
long timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
assertNoAlertTrigger(alertName, 1);
for (SearchRequest request : searchRequests) {
logger.info("Running: {}", request);
// A clean start. no data to trigger on and alert actions
cluster().wipeIndices(AlertActionManager.ALERT_HISTORY_INDEX, "my-index");
time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
.setCreate(true)
.setSource("event_type", "b")
.get();
timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
assertNoAlertTrigger(alertName, 2);
alertClient().prepareDeleteAlert(alertName).get();
alertClient().prepareIndexAlert(alertName)
.setAlertSource(createAlertSource(String.format("0/%s * * * * ? *", (scheduleTimeInMs / 1000)), request, "return hits.total >= 3"))
.get();
long time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
.setCreate(true)
.setSource("event_type", "a")
.get();
client().prepareIndex("my-index", "my-type")
.setCreate(true)
.setSource("event_type", "a")
.get();
long timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
assertNoAlertTrigger(alertName, 1);
time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
.setCreate(true)
.setSource("event_type", "b")
.get();
timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
assertNoAlertTrigger(alertName, 2);
time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
.setCreate(true)
.setSource("event_type", "a")
.get();
timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
assertAlertTriggered(alertName, 1);
}
time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
.setCreate(true)
.setSource("event_type", "a")
.get();
timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
assertAlertTriggered(alertName, 1);
}
}

@ -103,7 +103,7 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
SearchRequest searchRequest = new SearchRequest("test123");
searchRequest.writeTo(out);
builder.field(AlertActionManager.REQUEST);
AlertUtils.writeSearchRequest(searchRequest, builder);
AlertUtils.writeSearchRequest(searchRequest, builder, ToXContent.EMPTY_PARAMS);
SearchResponse searchResponse = new SearchResponse(
new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[0], 10, 0), null, null, null, false, false),
null, 1, 1, 0, new ShardSearchFailure[0]