Updated watch_history template to better support aggregations
- added dynamic_templates to the index template that make sure not to analyze email addresses and http `path` and `host` under the `watch_execution` object. This will enabled building aggregations over these fields. Also make sure that different time fields in thw `watch_record` are mapped as date types. - While at it changed the `watch_execution.input_result` mapping such that it's enabled and only disabled `watch_execution.input_result.payload` as different payloads from different sources may conflict with one another in terms of mappings - Fixed a bug in the `EmailTemplate` building of the `reply_to` field. - Also, added missing `execution_time` to the `watch_record` (under `watch_execution` object). Closes elastic/elasticsearch#335 Original commit: elastic/x-pack-elasticsearch@dd28c70bca
This commit is contained in:
parent
7d8dc38c29
commit
dc6235a90a
|
@ -5,7 +5,6 @@
|
|||
*/
|
||||
package org.elasticsearch.watcher.actions.email.service;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -250,7 +249,11 @@ public class EmailTemplate implements ToXContent {
|
|||
}
|
||||
|
||||
public Builder replyTo(String... replyTo) {
|
||||
return replyTo(new Template(Strings.arrayToCommaDelimitedString(replyTo)));
|
||||
Template[] templates = new Template[replyTo.length];
|
||||
for (int i = 0; i < templates.length; i++) {
|
||||
templates[i] = new Template(replyTo[i]);
|
||||
}
|
||||
return replyTo(templates);
|
||||
}
|
||||
|
||||
public Builder replyTo(Template... replyTo) {
|
||||
|
|
|
@ -26,9 +26,13 @@ public class WatcherDateUtils {
|
|||
private WatcherDateUtils() {
|
||||
}
|
||||
|
||||
public static DateTime parseDate(String dateAsText) {
|
||||
return parseDate(dateAsText, null);
|
||||
}
|
||||
|
||||
public static DateTime parseDate(String format, DateTimeZone timeZone) {
|
||||
DateTime dateTime = dateTimeFormatter.parser().parseDateTime(format);
|
||||
return dateTime.toDateTime(timeZone);
|
||||
return timeZone != null ? dateTime.toDateTime(timeZone) : dateTime;
|
||||
}
|
||||
|
||||
public static String formatDate(DateTime date) {
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.watcher.watch;
|
|||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -20,17 +21,21 @@ import org.elasticsearch.watcher.execution.WatchExecutionContext;
|
|||
import org.elasticsearch.watcher.execution.Wid;
|
||||
import org.elasticsearch.watcher.input.Input;
|
||||
import org.elasticsearch.watcher.input.InputRegistry;
|
||||
import org.elasticsearch.watcher.support.WatcherDateUtils;
|
||||
import org.elasticsearch.watcher.throttle.Throttler;
|
||||
import org.elasticsearch.watcher.transform.Transform;
|
||||
import org.elasticsearch.watcher.transform.TransformRegistry;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class WatchExecutionResult implements ToXContent {
|
||||
|
||||
private final DateTime executionTime;
|
||||
private final Input.Result inputResult;
|
||||
private final Condition.Result conditionResult;
|
||||
private final Throttler.Result throttleResult;
|
||||
|
@ -38,10 +43,11 @@ public class WatchExecutionResult implements ToXContent {
|
|||
private final ExecutableActions.Results actionsResults;
|
||||
|
||||
public WatchExecutionResult(WatchExecutionContext context) {
|
||||
this(context.inputResult(), context.conditionResult(), context.throttleResult(), context.transformResult(), context.actionsResults());
|
||||
this(context.executionTime(), context.inputResult(), context.conditionResult(), context.throttleResult(), context.transformResult(), context.actionsResults());
|
||||
}
|
||||
|
||||
WatchExecutionResult(Input.Result inputResult, Condition.Result conditionResult, Throttler.Result throttleResult, @Nullable Transform.Result transformResult, ExecutableActions.Results actionsResults) {
|
||||
WatchExecutionResult(DateTime executionTime, Input.Result inputResult, Condition.Result conditionResult, Throttler.Result throttleResult, @Nullable Transform.Result transformResult, ExecutableActions.Results actionsResults) {
|
||||
this.executionTime = executionTime;
|
||||
this.inputResult = inputResult;
|
||||
this.conditionResult = conditionResult;
|
||||
this.throttleResult = throttleResult;
|
||||
|
@ -49,6 +55,10 @@ public class WatchExecutionResult implements ToXContent {
|
|||
this.actionsResults = actionsResults;
|
||||
}
|
||||
|
||||
public DateTime executionTime() {
|
||||
return executionTime;
|
||||
}
|
||||
|
||||
public Input.Result inputResult() {
|
||||
return inputResult;
|
||||
}
|
||||
|
@ -72,6 +82,12 @@ public class WatchExecutionResult implements ToXContent {
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
||||
if (builder.humanReadable()) {
|
||||
builder.field(Parser.EXECUTION_TIME_FIELD.getPreferredName(), WatcherDateUtils.formatDate(executionTime));
|
||||
} else {
|
||||
builder.field(Parser.EXECUTION_TIME_FIELD.getPreferredName(), executionTime.getMillis());
|
||||
}
|
||||
if (inputResult != null) {
|
||||
builder.startObject(Parser.INPUT_RESULT_FIELD.getPreferredName())
|
||||
.field(inputResult.type(), inputResult, params)
|
||||
|
@ -104,6 +120,7 @@ public class WatchExecutionResult implements ToXContent {
|
|||
|
||||
public static class Parser {
|
||||
|
||||
public static final ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time");
|
||||
public static final ParseField INPUT_RESULT_FIELD = new ParseField("input_result");
|
||||
public static final ParseField CONDITION_RESULT_FIELD = new ParseField("condition_result");
|
||||
public static final ParseField ACTIONS_RESULTS = new ParseField("actions_results");
|
||||
|
@ -112,6 +129,7 @@ public class WatchExecutionResult implements ToXContent {
|
|||
|
||||
public static WatchExecutionResult parse(Wid wid, XContentParser parser, ConditionRegistry conditionRegistry, ActionRegistry actionRegistry,
|
||||
InputRegistry inputRegistry, TransformRegistry transformRegistry) throws IOException {
|
||||
DateTime executionTime = null;
|
||||
boolean throttled = false;
|
||||
String throttleReason = null;
|
||||
ExecutableActions.Results actionResults = null;
|
||||
|
@ -129,8 +147,20 @@ public class WatchExecutionResult implements ToXContent {
|
|||
throttleReason = parser.text();
|
||||
} else if (THROTTLED.match(currentFieldName)) {
|
||||
throttled = parser.booleanValue();
|
||||
} else if (EXECUTION_TIME_FIELD.match(currentFieldName)) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
try {
|
||||
executionTime = WatcherDateUtils.parseDate(parser.text());
|
||||
} catch (IllegalArgumentException iae) {
|
||||
throw new WatcherException("unable to parse watch execution [{}]. failed to parse date field [{}]", iae, wid, currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER){
|
||||
executionTime = new DateTime(parser.longValue(), UTC);
|
||||
} else {
|
||||
throw new WatcherException("unable to parse watch execution [{}]. failed to parse date field [{}]. expected either a string or a numeric value", wid, currentFieldName);
|
||||
}
|
||||
} else {
|
||||
throw new WatcherException("unable to parse watch execution. unexpected field [" + currentFieldName + "]");
|
||||
throw new WatcherException("unable to parse watch execution [{}]. unexpected field [{}]", wid, currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if (INPUT_RESULT_FIELD.match(currentFieldName)) {
|
||||
|
@ -149,8 +179,11 @@ public class WatchExecutionResult implements ToXContent {
|
|||
}
|
||||
}
|
||||
|
||||
if (executionTime == null) {
|
||||
throw new WatcherException("unable to parse watch execution [{}]. missing required date field [{}]", wid, EXECUTION_TIME_FIELD.getPreferredName());
|
||||
}
|
||||
Throttler.Result throttleResult = throttled ? Throttler.Result.throttle(throttleReason) : Throttler.Result.NO;
|
||||
return new WatchExecutionResult(inputResult, conditionResult, throttleResult, transformResult, actionResults);
|
||||
return new WatchExecutionResult(executionTime, inputResult, conditionResult, throttleResult, transformResult, actionResults);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,48 @@
|
|||
},
|
||||
"mappings": {
|
||||
"watch_record": {
|
||||
"dynamic_templates" : [
|
||||
{
|
||||
"email_not_analyzed_fields": {
|
||||
"path_match": ".*\\.email\\.(from|to|cc|bcc|reply_to|id)",
|
||||
"match_pattern" : "regex",
|
||||
"match_mapping_type": "string",
|
||||
"mapping": {
|
||||
"type": "string",
|
||||
"index": "not_analyzed"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"time_fields": {
|
||||
"path_match": ".*(execution|triggered|scheduled)_time",
|
||||
"match_pattern" : "regex",
|
||||
"mapping": {
|
||||
"type": "date"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"http_not_analyzed_fields": {
|
||||
"path_match": "watch_execution\\.((actions_results\\..+\\.webhook\\.request)|input_result\\.http\\.sent_request)\\.(path|host)",
|
||||
"match_pattern" : "regex",
|
||||
"mapping": {
|
||||
"type": "string",
|
||||
"index": "not_analyzed"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"disabled_fields" : {
|
||||
"path_match": "watch_execution\\.input_result\\..+\\.payload",
|
||||
"match_pattern" : "regex",
|
||||
"mapping": {
|
||||
"type" : "object",
|
||||
"enabled" : false
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"dynamic" : "strict",
|
||||
"_all" : {
|
||||
"enabled" : false
|
||||
|
@ -39,13 +81,7 @@
|
|||
},
|
||||
"watch_execution" : {
|
||||
"type" : "object",
|
||||
"dynamic" : true,
|
||||
"properties" : {
|
||||
"input_result" : {
|
||||
"type" : "object",
|
||||
"enabled" : false
|
||||
}
|
||||
}
|
||||
"dynamic" : true
|
||||
},
|
||||
"metadata" : {
|
||||
"type" : "object",
|
||||
|
|
|
@ -370,6 +370,29 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
|
|||
});
|
||||
}
|
||||
|
||||
protected void assertWatchWithMinimumActionsCount(final String watchName, final WatchRecord.State recordState, final long recordCount) throws Exception {
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
String[] watchHistoryIndices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), HistoryStore.INDEX_PREFIX + "*");
|
||||
assertThat(watchHistoryIndices, not(emptyArray()));
|
||||
for (String index : watchHistoryIndices) {
|
||||
IndexRoutingTable routingTable = state.getRoutingTable().index(index);
|
||||
assertThat(routingTable, notNullValue());
|
||||
assertThat(routingTable.allPrimaryShardsActive(), is(true));
|
||||
}
|
||||
|
||||
refresh();
|
||||
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*")
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.setQuery(boolQuery().must(matchQuery("watch_id", watchName)).must(matchQuery("state", recordState.id())))
|
||||
.get();
|
||||
assertThat("could not find executed watch record", searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(recordCount));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected void ensureWatcherStarted() throws Exception {
|
||||
ensureWatcherStarted(true);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.watcher.actions.email.service.EmailTemplate;
|
||||
import org.elasticsearch.watcher.actions.email.service.support.EmailServer;
|
||||
import org.elasticsearch.watcher.history.HistoryStore;
|
||||
import org.elasticsearch.watcher.history.WatchRecord;
|
||||
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
|
||||
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
|
||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
||||
import static org.elasticsearch.watcher.actions.ActionBuilders.emailAction;
|
||||
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition;
|
||||
import static org.elasticsearch.watcher.input.InputBuilders.simpleInput;
|
||||
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
* This test makes sure that the email address fields in the watch_record action result are
|
||||
* not analyzed so they can be used in aggregations
|
||||
*/
|
||||
public class HistoryTemplateEmailMappingsTests extends AbstractWatcherIntegrationTests {
|
||||
|
||||
static final String USERNAME = "_user";
|
||||
static final String PASSWORD = "_passwd";
|
||||
|
||||
private EmailServer server;
|
||||
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return true; // just to have better control over the triggers
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enableShield() {
|
||||
return false; // remove shield noise from this test
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
if(server == null) {
|
||||
//Need to construct the Email Server here as this happens before init()
|
||||
server = EmailServer.localhost("2500-2600", USERNAME, PASSWORD, logger);
|
||||
}
|
||||
return ImmutableSettings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put("script.disable_dynamic", false)
|
||||
|
||||
// email
|
||||
.put("watcher.actions.email.service.account.test.smtp.auth", true)
|
||||
.put("watcher.actions.email.service.account.test.smtp.user", USERNAME)
|
||||
.put("watcher.actions.email.service.account.test.smtp.password", PASSWORD)
|
||||
.put("watcher.actions.email.service.account.test.smtp.port", server.port())
|
||||
.put("watcher.actions.email.service.account.test.smtp.host", "localhost")
|
||||
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmailFields() throws Exception {
|
||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(simpleInput())
|
||||
.condition(alwaysCondition())
|
||||
.addAction("_email", emailAction(EmailTemplate.builder()
|
||||
.from("from@example.com")
|
||||
.to("to1@example.com", "to2@example.com")
|
||||
.cc("cc1@example.com", "cc2@example.com")
|
||||
.bcc("bcc1@example.com", "bcc2@example.com")
|
||||
.replyTo("rt1@example.com", "rt2@example.com")
|
||||
.subject("_subject")
|
||||
.textBody("_body"))))
|
||||
.get();
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
timeWarp().scheduler().trigger("_id");
|
||||
flush();
|
||||
refresh();
|
||||
|
||||
// the action should fail as no email server is available
|
||||
assertWatchWithMinimumActionsCount("_id", WatchRecord.State.EXECUTED, 1);
|
||||
|
||||
SearchResponse response = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").setSource(searchSource()
|
||||
.aggregation(terms("from").field("watch_execution.actions_results._email.email.email.from"))
|
||||
.aggregation(terms("to").field("watch_execution.actions_results._email.email.email.to"))
|
||||
.aggregation(terms("cc").field("watch_execution.actions_results._email.email.email.cc"))
|
||||
.aggregation(terms("bcc").field("watch_execution.actions_results._email.email.email.bcc"))
|
||||
.aggregation(terms("reply_to").field("watch_execution.actions_results._email.email.email.reply_to"))
|
||||
.buildAsBytes())
|
||||
.get();
|
||||
|
||||
assertThat(response, notNullValue());
|
||||
assertThat(response.getHits().getTotalHits(), is(1L));
|
||||
Aggregations aggs = response.getAggregations();
|
||||
assertThat(aggs, notNullValue());
|
||||
|
||||
Terms terms = aggs.get("from");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getBuckets().size(), is(1));
|
||||
assertThat(terms.getBucketByKey("from@example.com"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("from@example.com").getDocCount(), is(1L));
|
||||
|
||||
terms = aggs.get("to");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getBuckets().size(), is(2));
|
||||
assertThat(terms.getBucketByKey("to1@example.com"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("to1@example.com").getDocCount(), is(1L));
|
||||
assertThat(terms.getBucketByKey("to2@example.com"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("to2@example.com").getDocCount(), is(1L));
|
||||
|
||||
terms = aggs.get("cc");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getBuckets().size(), is(2));
|
||||
assertThat(terms.getBucketByKey("cc1@example.com"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("cc1@example.com").getDocCount(), is(1L));
|
||||
assertThat(terms.getBucketByKey("cc2@example.com"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("cc2@example.com").getDocCount(), is(1L));
|
||||
|
||||
terms = aggs.get("bcc");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getBuckets().size(), is(2));
|
||||
assertThat(terms.getBucketByKey("bcc1@example.com"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("bcc1@example.com").getDocCount(), is(1L));
|
||||
assertThat(terms.getBucketByKey("bcc2@example.com"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("bcc2@example.com").getDocCount(), is(1L));
|
||||
|
||||
terms = aggs.get("reply_to");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getBuckets().size(), is(2));
|
||||
assertThat(terms.getBucketByKey("rt1@example.com"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("rt1@example.com").getDocCount(), is(1L));
|
||||
assertThat(terms.getBucketByKey("rt2@example.com"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("rt2@example.com").getDocCount(), is(1L));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import com.squareup.okhttp.mockwebserver.MockResponse;
|
||||
import com.squareup.okhttp.mockwebserver.MockWebServer;
|
||||
import com.squareup.okhttp.mockwebserver.QueueDispatcher;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.watcher.history.HistoryStore;
|
||||
import org.elasticsearch.watcher.history.WatchRecord;
|
||||
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
|
||||
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
|
||||
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.BindException;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
|
||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
||||
import static org.elasticsearch.watcher.actions.ActionBuilders.webhookAction;
|
||||
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition;
|
||||
import static org.elasticsearch.watcher.input.InputBuilders.httpInput;
|
||||
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
* This test makes sure that the http host and path fields in the watch_record action result are
|
||||
* not analyzed so they can be used in aggregations
|
||||
*/
|
||||
public class HistoryTemplateHttpMappingsTests extends AbstractWatcherIntegrationTests {
|
||||
|
||||
private int webPort;
|
||||
private MockWebServer webServer;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
for (webPort = 9200; webPort < 9300; webPort++) {
|
||||
try {
|
||||
webServer = new MockWebServer();
|
||||
QueueDispatcher dispatcher = new QueueDispatcher();
|
||||
dispatcher.setFailFast(true);
|
||||
webServer.setDispatcher(dispatcher);
|
||||
webServer.start(webPort);
|
||||
return;
|
||||
} catch (BindException be) {
|
||||
logger.warn("port [{}] was already in use trying next port", webPort);
|
||||
}
|
||||
}
|
||||
throw new ElasticsearchException("unable to find open port between 9200 and 9300");
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
webServer.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return true; // just to have better control over the triggers
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enableShield() {
|
||||
return false; // remove shield noise from this test
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpFields() throws Exception {
|
||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(httpInput(HttpRequestTemplate.builder("localhost", webPort).path("/input/path")))
|
||||
.condition(alwaysCondition())
|
||||
.addAction("_webhook", webhookAction(HttpRequestTemplate.builder("localhost", webPort)
|
||||
.path("/webhook/path")
|
||||
.body("_body"))))
|
||||
.get();
|
||||
|
||||
|
||||
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("{}"));
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
timeWarp().scheduler().trigger("_id");
|
||||
flush();
|
||||
refresh();
|
||||
|
||||
// the action should fail as no email server is available
|
||||
assertWatchWithMinimumActionsCount("_id", WatchRecord.State.EXECUTED, 1);
|
||||
|
||||
SearchResponse response = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").setSource(searchSource()
|
||||
.aggregation(terms("input_result_path").field("watch_execution.input_result.http.sent_request.path"))
|
||||
.aggregation(terms("webhook_path").field("watch_execution.actions_results._webhook.webhook.request.path"))
|
||||
.buildAsBytes())
|
||||
.get();
|
||||
|
||||
assertThat(response, notNullValue());
|
||||
assertThat(response.getHits().getTotalHits(), is(1L));
|
||||
Aggregations aggs = response.getAggregations();
|
||||
assertThat(aggs, notNullValue());
|
||||
|
||||
Terms terms = aggs.get("input_result_path");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getBuckets().size(), is(1));
|
||||
assertThat(terms.getBucketByKey("/input/path"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("/input/path").getDocCount(), is(1L));
|
||||
|
||||
terms = aggs.get("webhook_path");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getBuckets().size(), is(1));
|
||||
assertThat(terms.getBucketByKey("/webhook/path"), notNullValue());
|
||||
assertThat(terms.getBucketByKey("/webhook/path").getDocCount(), is(1L));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.hppc.cursors.ObjectCursor;
|
||||
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.watcher.history.WatchRecord;
|
||||
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
|
||||
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
|
||||
import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction;
|
||||
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition;
|
||||
import static org.elasticsearch.watcher.input.InputBuilders.simpleInput;
|
||||
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
* This test makes sure that the different time fields in the watch_record are mapped as date types
|
||||
*/
|
||||
public class HistoryTemplateTimeMappingsTests extends AbstractWatcherIntegrationTests {
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return true; // just to have better control over the triggers
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enableShield() {
|
||||
return false; // remove shield noise from this test
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeFields() throws Exception {
|
||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(simpleInput())
|
||||
.condition(alwaysCondition())
|
||||
.addAction("_logging", loggingAction("foobar")))
|
||||
.get();
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
timeWarp().scheduler().trigger("_id");
|
||||
flush();
|
||||
refresh();
|
||||
|
||||
// the action should fail as no email server is available
|
||||
assertWatchWithMinimumActionsCount("_id", WatchRecord.State.EXECUTED, 1);
|
||||
|
||||
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
|
||||
assertThat(mappingsResponse, notNullValue());
|
||||
assertThat(mappingsResponse.getMappings().isEmpty(), is(false));
|
||||
for (ObjectObjectCursor<String, ImmutableOpenMap<String, MappingMetaData>> metadatas : mappingsResponse.getMappings()) {
|
||||
if (!metadatas.key.startsWith(".watch_history")) {
|
||||
continue;
|
||||
}
|
||||
MappingMetaData metadata = metadatas.value.get("watch_record");
|
||||
assertThat(metadata, notNullValue());
|
||||
Map<String, Object> source = metadata.getSourceAsMap();
|
||||
assertThat(extractValue("properties.trigger_event.properties.schedule.properties.scheduled_time.type", source), is((Object) "date"));
|
||||
assertThat(extractValue("properties.trigger_event.properties.schedule.properties.triggered_time.type", source), is((Object) "date"));
|
||||
assertThat(extractValue("properties.watch_execution.properties.execution_time.type", source), is((Object) "date"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Repeat;
|
||||
import com.squareup.okhttp.mockwebserver.MockResponse;
|
||||
import com.squareup.okhttp.mockwebserver.MockWebServer;
|
||||
import com.squareup.okhttp.mockwebserver.QueueDispatcher;
|
||||
|
@ -44,7 +45,7 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTests {
|
|||
|
||||
@Before
|
||||
public void startWebservice() throws Exception {
|
||||
for (webPort = 9200; webPort < 9300; webPort++) {
|
||||
for (webPort = 9250; webPort < 9300; webPort++) {
|
||||
try {
|
||||
webServer = new MockWebServer();
|
||||
QueueDispatcher dispatcher = new QueueDispatcher();
|
||||
|
@ -64,7 +65,7 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTests {
|
|||
webServer.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test @Repeat(iterations = 20)
|
||||
public void testWebhook() throws Exception {
|
||||
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
|
||||
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("localhost", webPort)
|
||||
|
|
Loading…
Reference in New Issue