From 71aa3a8059c502e372785b465d4aedcf7761c49c Mon Sep 17 00:00:00 2001 From: uboness Date: Sat, 16 May 2015 00:55:23 +0200 Subject: [PATCH] Added Dynamic Index Names Support The search input/transform rely on users configuring the search requests. Sometimes (often), these search requests are executed on time-based indices. The problem the Until now, there's no way to define dynamic index names that are bound to time, which forces the request to search all the indices (for example, the Marvel watches se This commit adds dynamic index name resolution. It works in the following way: - and index name can be a simple string (indicating the static/absolute index name you're searching, incl. wildcards) - an index name can also be a template. The template is enclosed within `<` and `>` (these are officially illegal characters for index names, so these are safe to use) - the template can have both static parts to it and place holder parts. The place holders are enclosed within `{` and `}`. The place holder holds `date math` expression * `"<.marvel-{now}>"` will resolve to `".marvel-2022.03.03"` (the default date format is `YYYY.MM.dd`) * `"<.marvel-{now/M}>"` will resolve to `".marvel-2022.03.01"` * `"<.marvel-{now{YYYY.MM}}>"` will resolve to `".marvel-2022.03"` (this one has a custom date format - `YYYY.MM`) * `"<.marvel-{now/M-1M{YYYY.MM}}>"` will resolve to `".marvel-2022.02"` The following is an example of a search input that searches marvel indices for the last 3 days (relies on the default Marvel indices format - `.marvel-YYYY.MM.dd`): ``` { ... "input" : { "search" : { "request" : { "indices" : [ "<.marvel-{now/d-2d}>", "<.marvel-{now/d-1d}>", "<.marvel-{now/d}>" ], ... } } } ... } ``` - `index` action was also updated to work with a dynamic index name (e.g. it's possible to index into daily indices by setting the index name to ``) Original commit: elastic/x-pack-elasticsearch@9c15a96029a594ffa4a9ddc011cc85ee7541e0f5 --- .../elasticsearch/watcher/WatcherModule.java | 2 + .../actions/index/ExecutableIndexAction.java | 16 +- .../watcher/actions/index/IndexAction.java | 11 +- .../actions/index/IndexActionFactory.java | 8 +- .../watcher/input/InputFactory.java | 2 +- .../input/search/ExecutableSearchInput.java | 14 +- .../watcher/input/search/SearchInput.java | 1 - .../input/search/SearchInputFactory.java | 6 +- .../watcher/support/DynamicIndexName.java | 317 ++++++++++++++++++ .../watcher/support/WatcherUtils.java | 10 +- .../search/ExecutableSearchTransform.java | 14 +- .../search/SearchTransformFactory.java | 6 +- src/main/resources/watch_history.json | 20 +- .../actions/index/IndexActionTests.java | 40 ++- .../input/search/SearchInputTests.java | 71 +++- .../DynamicIndexNameIntegrationTests.java | 134 ++++++++ .../support/DynamicIndexNameTests.java | 169 ++++++++++ .../test/AbstractWatcherIntegrationTests.java | 4 + .../watcher/test/WatcherTestUtils.java | 3 +- .../search/SearchTransformTests.java | 80 ++++- .../watcher/watch/WatchTests.java | 25 +- 21 files changed, 878 insertions(+), 75 deletions(-) create mode 100644 src/main/java/org/elasticsearch/watcher/support/DynamicIndexName.java create mode 100644 src/test/java/org/elasticsearch/watcher/support/DynamicIndexNameIntegrationTests.java create mode 100644 src/test/java/org/elasticsearch/watcher/support/DynamicIndexNameTests.java diff --git a/src/main/java/org/elasticsearch/watcher/WatcherModule.java b/src/main/java/org/elasticsearch/watcher/WatcherModule.java index 409baa766bd..34035515849 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherModule.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherModule.java @@ -19,6 +19,7 @@ import org.elasticsearch.watcher.input.InputModule; import org.elasticsearch.watcher.license.LicenseModule; import org.elasticsearch.watcher.rest.WatcherRestModule; import org.elasticsearch.watcher.shield.WatcherShieldModule; +import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.clock.ClockModule; import org.elasticsearch.watcher.support.http.HttpClientModule; @@ -70,6 +71,7 @@ public class WatcherModule extends AbstractModule implements SpawnModules { bind(WatcherLifeCycleService.class).asEagerSingleton(); bind(TemplateUtils.class).asEagerSingleton(); bind(WatcherSettingsValidation.class).asEagerSingleton(); + bind(DynamicIndexName.Parser.class).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java b/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java index ca69765e0f9..885baa613dc 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java +++ b/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.watcher.actions.Action; import org.elasticsearch.watcher.actions.ExecutableAction; import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.support.ArrayObjectIterator; +import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.xcontent.XContentSource; @@ -32,10 +33,16 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public class ExecutableIndexAction extends ExecutableAction { private final ClientProxy client; + private final DynamicIndexName indexName; - public ExecutableIndexAction(IndexAction action, ESLogger logger, ClientProxy client) { + public ExecutableIndexAction(IndexAction action, ESLogger logger, ClientProxy client, DynamicIndexName.Parser indexNameParser) { super(action, logger); this.client = client; + this.indexName = indexNameParser.parse(action.index); + } + + DynamicIndexName indexName() { + return indexName; } @Override @@ -57,7 +64,8 @@ public class ExecutableIndexAction extends ExecutableAction { } IndexRequest indexRequest = new IndexRequest(); - indexRequest.index(action.index); + + indexRequest.index(indexName.name(ctx.executionTime())); indexRequest.type(action.docType); if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) { @@ -72,7 +80,7 @@ public class ExecutableIndexAction extends ExecutableAction { indexRequest.source(jsonBuilder().prettyPrint().map(data)); if (ctx.simulateAction(actionId)) { - return new IndexAction.Result.Simulated(action.index, action.docType, new XContentSource(indexRequest.source(), XContentType.JSON)); + return new IndexAction.Result.Simulated(indexRequest.index(), action.docType, new XContentSource(indexRequest.source(), XContentType.JSON)); } IndexResponse response = client.index(indexRequest); @@ -89,7 +97,7 @@ public class ExecutableIndexAction extends ExecutableAction { } Map doc = (Map) item; IndexRequest indexRequest = new IndexRequest(); - indexRequest.index(action.index); + indexRequest.index(indexName.name(ctx.executionTime())); indexRequest.type(action.docType); if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) { if (!(doc instanceof HashMap)) { diff --git a/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java b/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java index 269cf1c80ce..1e562f2043f 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java +++ b/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.actions.Action; +import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.xcontent.XContentSource; import java.io.IOException; @@ -91,10 +92,14 @@ public class IndexAction implements Action { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - if (Field.INDEX.match(currentFieldName)) { + } else if (Field.INDEX.match(currentFieldName)) { + try { index = parser.text(); - } else if (Field.DOC_TYPE.match(currentFieldName)) { + } catch (DynamicIndexName.ParseException pe) { + throw new IndexActionException("could not parse [{}] action [{}/{}]. failed to parse index name value for field [{}]", pe, TYPE, watchId, actionId, currentFieldName); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (Field.DOC_TYPE.match(currentFieldName)) { docType = parser.text(); } else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName)) { executionTimeField = parser.text(); diff --git a/src/main/java/org/elasticsearch/watcher/actions/index/IndexActionFactory.java b/src/main/java/org/elasticsearch/watcher/actions/index/IndexActionFactory.java index 5ec3d1d9e18..990999157e6 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/index/IndexActionFactory.java +++ b/src/main/java/org/elasticsearch/watcher/actions/index/IndexActionFactory.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.actions.ActionFactory; import org.elasticsearch.watcher.actions.email.ExecutableEmailAction; +import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; @@ -21,11 +22,14 @@ import java.io.IOException; public class IndexActionFactory extends ActionFactory { private final ClientProxy client; + private final DynamicIndexName.Parser indexNamesParser; @Inject - public IndexActionFactory(Settings settings, ClientProxy client) { + public IndexActionFactory(Settings settings, ClientProxy client, DynamicIndexName.Parser indexNamesParser) { super(Loggers.getLogger(ExecutableEmailAction.class, settings)); this.client = client; + String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.actions.index"); + this.indexNamesParser = new DynamicIndexName.Parser(defaultDateFormat); } @Override @@ -40,6 +44,6 @@ public class IndexActionFactory extends ActionFactory { private final ClientProxy client; + private final DynamicIndexName.Parser indexNameParser; @Inject public SearchInputFactory(Settings settings, ClientProxy client) { super(Loggers.getLogger(ExecutableSimpleInput.class, settings)); this.client = client; + String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.input.search"); + this.indexNameParser = new DynamicIndexName.Parser(defaultDateFormat); } @Override @@ -40,6 +44,6 @@ public class SearchInputFactory extends InputFactory expressions = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + boolean inPlaceHolder = false; + boolean inDateFormat = false; + boolean escape = false; + for (int i = from; i < length; i++) { + boolean escapedChar = escape; + if (escape) { + escape = false; + } + char c = text[i]; + + if (c == ESCAPE_CHAR) { + if (escapedChar) { + sb.append(c); + escape = false; + } else { + escape = true; + } + continue; + } + + if (inPlaceHolder) { + switch (c) { + + case LEFT_BOUND: + if (inDateFormat && escapedChar) { + sb.append(c); + } else if (!inDateFormat) { + inDateFormat = true; + sb.append(c); + } else { + throw new ParseException("invalid dynamic name expression [{}]. invalid character in placeholder at position [{}]", new String(text, from, length), i); + } + break; + + case RIGHT_BOUND: + if (inDateFormat && escapedChar) { + sb.append(c); + } else if (inDateFormat) { + inDateFormat = false; + sb.append(c); + } else { + expressions.add(new DateMathExpression(defaultDateFormat, sb.toString())); + sb = new StringBuilder(); + inPlaceHolder = false; + dynamic = true; + } + break; + + default: + sb.append(c); + } + } else { + switch (c) { + + case LEFT_BOUND: + if (escapedChar) { + sb.append(c); + } else { + expressions.add(new StaticExpression(sb.toString())); + sb = new StringBuilder(); + inPlaceHolder = true; + } + break; + + case RIGHT_BOUND: + if (!escapedChar) { + throw new ParseException("invalid dynamic name expression [{}]. invalid character at position [{}]. " + + "`{` and `}` are reserved characters and should be escaped when used as part of the index name using `\\` (e.g. `\\{text\\}`)", new String(text, from, length), i); + } + default: + sb.append(c); + } + } + } + if (inPlaceHolder) { + throw new ParseException("invalid dynamic name expression [{}]. date math placeholder is open ended", new String(text, from, length)); + } + if (sb.length() > 0) { + expressions.add(new StaticExpression(sb.toString())); + } + + if (!dynamic) { + // if all the expressions are static... lets optimize to a single static expression + sb = new StringBuilder(); + for (Expression expression : expressions) { + sb.append(((StaticExpression) expression).value); + } + return new StaticExpression(sb.toString()); + } + + if (expressions.size() == 1) { + return expressions.get(0); + } + + return new CompoundExpression(expressions.toArray(new Expression[expressions.size()])); + } + } + + static class DateMathExpression implements Expression { + + private final DateMathParser dateMathParser; + private final String mathExpression; + private final FormatDateTimeFormatter formatter; + + public DateMathExpression(String defaultFormat, String expression) { + int i = expression.indexOf(LEFT_BOUND); + if (i < 0) { + mathExpression = expression; + formatter = Joda.forPattern(defaultFormat); + } else { + if (expression.lastIndexOf(RIGHT_BOUND) != expression.length() - 1) { + throw new ParseException("invalid dynamic name expression [{}]. missing closing `}` for date math format", expression); + } + if (i == expression.length() - 2) { + throw new ParseException("invalid dynamic name expression [{}]. missing date format", expression); + } + mathExpression = expression.substring(0, i); + formatter = Joda.forPattern(expression.substring(i + 1, expression.length() - 1)); + } + dateMathParser = new DateMathParser(formatter); + } + + @Override + public String eval(final DateTime now) { + long millis = dateMathParser.parse(mathExpression, new Callable() { + @Override + public Long call() throws Exception { + return now.getMillis(); + } + }); + return formatter.printer().print(millis); + } + } + + public static class Parser { + + private final String defaultDateFormat; + + public Parser() { + this(DEFAULT_DATE_FORMAT); + } + + public Parser(String defaultDateFormat) { + this.defaultDateFormat = defaultDateFormat; + } + + public DynamicIndexName parse(String template) { + if (template == null) { + return null; + } + if (!template.startsWith(EXPRESSION_LEFT_BOUND) || !template.endsWith(EXPRESSION_RIGHT_BOUND)) { + return new DynamicIndexName(template, new StaticExpression(template)); + } + return new DynamicIndexName(template, CompoundExpression.parse(defaultDateFormat, template.toCharArray(), 1, template.length() - 1)); + } + + public DynamicIndexName[] parse(String[] templates) { + if (templates.length == 0) { + return null; + } + DynamicIndexName[] dynamicIndexNames = new DynamicIndexName[templates.length]; + for (int i = 0; i < dynamicIndexNames.length; i++) { + dynamicIndexNames[i] = parse(templates[i]); + } + return dynamicIndexNames; + } + + public DynamicIndexName parse(XContentParser parser) throws IOException { + if (parser.currentToken() != XContentParser.Token.VALUE_STRING) { + throw new ParseException("could not parse index name. expected a string value but found [{}] instead", parser.currentToken()); + } + return parse(parser.text()); + } + } + + public static class ParseException extends WatcherException { + + public ParseException(String msg, Object... args) { + super(msg, args); + } + + public ParseException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + } +} diff --git a/src/main/java/org/elasticsearch/watcher/support/WatcherUtils.java b/src/main/java/org/elasticsearch/watcher/support/WatcherUtils.java index e7f5f2c74b9..08be3933e65 100644 --- a/src/main/java/org/elasticsearch/watcher/support/WatcherUtils.java +++ b/src/main/java/org/elasticsearch/watcher/support/WatcherUtils.java @@ -8,6 +8,7 @@ package org.elasticsearch.watcher.support; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; @@ -55,11 +56,16 @@ public final class WatcherUtils { } } - public static SearchRequest createSearchRequestFromPrototype(SearchRequest requestPrototype, WatchExecutionContext ctx, Payload payload) throws IOException { + public static SearchRequest createSearchRequestFromPrototype(SearchRequest requestPrototype, @Nullable DynamicIndexName[] dynamicIndexNames, WatchExecutionContext ctx, Payload payload) throws IOException { + + String[] indices = dynamicIndexNames == null ? + requestPrototype.indices() : + DynamicIndexName.names(dynamicIndexNames, ctx.executionTime()); + SearchRequest request = new SearchRequest(requestPrototype) .indicesOptions(requestPrototype.indicesOptions()) .searchType(requestPrototype.searchType()) - .indices(requestPrototype.indices()) + .indices(indices) .types(requestPrototype.types()); // TODO: Revise this search template conversion code once search templates in core have been refactored once ES 2.0 is released. diff --git a/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java index ac4391afe8a..a063b092a83 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java @@ -8,8 +8,10 @@ package org.elasticsearch.watcher.transform.search; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.WatcherUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.transform.ExecutableTransform; @@ -23,17 +25,24 @@ public class ExecutableSearchTransform extends ExecutableTransform { protected final ClientProxy client; + protected final DynamicIndexName.Parser indexNameParser; @Inject public SearchTransformFactory(Settings settings, ClientProxy client) { super(Loggers.getLogger(ExecutableSearchTransform.class, settings)); this.client = client; + String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.transform.search"); + this.indexNameParser = new DynamicIndexName.Parser(defaultDateFormat); } @Override @@ -39,6 +43,6 @@ public class SearchTransformFactory extends TransformFactory") + .field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type") + .endObject(); + + DynamicIndexName.Parser indexNameParser = new DynamicIndexName.Parser(); + IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client()), indexNameParser); + XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); + parser.nextToken(); + + ExecutableIndexAction executable = actionParser.parseExecutable(randomAsciiOfLength(5), randomAsciiOfLength(3), parser); + + DateTime now = DateTime.now(UTC); + assertThat(executable, notNullValue()); + assertThat(executable.action().index, is("")); + assertThat(executable.indexName().name(now), is("idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now))); + } + @Test public void testParser_Failure() throws Exception { XContentBuilder builder = jsonBuilder(); @@ -225,7 +249,8 @@ public class IndexActionTests extends ElasticsearchIntegrationTest { } } builder.endObject(); - IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client())); + DynamicIndexName.Parser indexNameParser = new DynamicIndexName.Parser(); + IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client()), indexNameParser); XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); try { @@ -237,5 +262,4 @@ public class IndexActionTests extends ElasticsearchIntegrationTest { assertThat(useIndex && useType, equalTo(false)); } } - } diff --git a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java index 8584dd11055..5e4d8bf7fce 100644 --- a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java @@ -5,21 +5,22 @@ */ package org.elasticsearch.watcher.input.search; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.plugins.PluginsService; -import org.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.watcher.actions.ActionStatus; import org.elasticsearch.watcher.actions.ActionWrapper; import org.elasticsearch.watcher.actions.ExecutableActions; @@ -28,6 +29,7 @@ import org.elasticsearch.watcher.execution.TriggeredExecutionContext; import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput; import org.elasticsearch.watcher.input.simple.SimpleInput; +import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.template.Template; import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule; @@ -36,7 +38,8 @@ import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Watch; import org.elasticsearch.watcher.watch.WatchStatus; -import org.joda.time.DateTimeZone; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; import org.junit.Test; import java.io.IOException; @@ -51,18 +54,17 @@ import java.util.Map; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.index.query.QueryBuilders.filteredQuery; -import static org.elasticsearch.index.query.QueryBuilders.matchQuery; -import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; +import static org.elasticsearch.index.query.QueryBuilders.*; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE; import static org.elasticsearch.watcher.test.WatcherTestUtils.areJsonEquivalent; import static org.elasticsearch.watcher.test.WatcherTestUtils.getRandomSupportedSearchType; import static org.hamcrest.Matchers.*; +import static org.joda.time.DateTimeZone.UTC; /** */ -@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) +@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) public class SearchInputTests extends ElasticsearchIntegrationTest { private final static String TEMPLATE_QUERY = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + @@ -113,7 +115,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .request() .source(searchSourceBuilder); - ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client())); + ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); WatchExecutionContext ctx = new TriggeredExecutionContext( new Watch("test-watch", new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), @@ -124,8 +126,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { new ExecutableActions(new ArrayList()), null, new WatchStatus(ImmutableMap.of())), - new DateTime(0, DateTimeZone.UTC), - new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)), + new DateTime(0, UTC), + new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)), timeValueSeconds(5)); SearchInput.Result result = searchInput.execute(ctx); @@ -220,7 +222,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .request() .source(searchSourceBuilder); - ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client())); + ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); WatchExecutionContext ctx = new TriggeredExecutionContext( new Watch("test-watch", new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), @@ -231,8 +233,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { new ExecutableActions(new ArrayList()), null, new WatchStatus(ImmutableMap.of())), - new DateTime(0, DateTimeZone.UTC), - new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)), + new DateTime(0, UTC), + new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)), timeValueSeconds(5)); SearchInput.Result result = searchInput.execute(ctx); @@ -261,6 +263,43 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { assertEquals(SearchInput.TYPE, searchInput.type()); } + @Test + public void testParser_IndexNames() throws Exception { + SearchRequest request = client().prepareSearch() + .setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE) + .setIndices("test", "") + .request() + .source(searchSource() + .query(boolQuery().must(matchQuery("event_type", "a")).filter(rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}")))); + + XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null)); + XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); + parser.nextToken(); + + String dateFormat; + Settings settings; + if (randomBoolean()) { + dateFormat = DynamicIndexName.DEFAULT_DATE_FORMAT; + settings = Settings.EMPTY; + } else { + dateFormat = "YYYY-MM"; + settings = Settings.builder() + .put("watcher.input.search.dynamic_indices.default_date_format", dateFormat) + .build(); + } + + SearchInputFactory factory = new SearchInputFactory(settings, ClientProxy.of(client())); + + ExecutableSearchInput executable = factory.parseExecutable("_id", parser); + DynamicIndexName[] indexNames = executable.indexNames(); + assertThat(indexNames, notNullValue()); + DateTime now = DateTime.now(UTC); + String[] names = DynamicIndexName.names(indexNames, now); + assertThat(names, notNullValue()); + assertThat(names.length, is(2)); + assertThat(names, arrayContaining("test", "test-" + DateTimeFormat.forPattern(dateFormat).print(now.withDayOfMonth(1).minusMonths(1)))); + } + @Test(expected = SearchInputException.class) public void testParser_ScanNotSupported() throws Exception { SearchRequest request = client().prepareSearch() @@ -290,8 +329,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { new ExecutableActions(new ArrayList()), null, new WatchStatus(ImmutableMap.of())), - new DateTime(60000, DateTimeZone.UTC), - new ScheduleTriggerEvent("test-watch", new DateTime(60000, DateTimeZone.UTC), new DateTime(60000, DateTimeZone.UTC)), + new DateTime(60000, UTC), + new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)), timeValueSeconds(5)); } @@ -302,7 +341,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { SearchInput si = siBuilder.build(); - ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client())); + ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); return searchInput.execute(ctx); } diff --git a/src/test/java/org/elasticsearch/watcher/support/DynamicIndexNameIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/support/DynamicIndexNameIntegrationTests.java new file mode 100644 index 00000000000..4fc326b5a19 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/support/DynamicIndexNameIntegrationTests.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.util.Callback; +import org.elasticsearch.watcher.client.WatcherClient; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; +import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule; +import org.joda.time.format.DateTimeFormat; +import org.junit.Test; + +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; +import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction; +import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition; +import static org.elasticsearch.watcher.input.InputBuilders.searchInput; +import static org.elasticsearch.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.is; + +/** + */ +public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegrationTests { + + @Override + protected boolean timeWarped() { + return true; + } + + @Override + protected boolean enableShield() { + return false; // reduce noise + } + + @Test + public void testDynamicIndexAction() throws Exception { + WatcherClient watcherClient = watcherClient(); + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id") + .setSource(watchBuilder() + .trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))) + .input(simpleInput("key", "value")) + .condition(alwaysCondition()) + .addAction("dynamic_index", indexAction("", "type"))) + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + timeWarp().scheduler().trigger("_id"); + refresh(); + + assertWatchWithMinimumPerformedActionsCount("_id", 1, false); + + final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().now()); + logger.info("checking index [{}]", indexName); + assertBusy(new Runnable() { + @Override + public void run() { + flush(); + refresh(); + long docCount = docCount(indexName, "type", matchAllQuery()); + assertThat(docCount, is(1L)); + } + }); + } + + @Test + public void testDynamicIndexSearchInput() throws Exception { + final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().now()); + createIndex(indexName); + index(indexName, "type", "1", "key", "value"); + flush(); + refresh(); + + WatcherClient watcherClient = watcherClient(); + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id") + .setSource(watchBuilder() + .trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))) + .input(searchInput(new SearchRequest("").types("type")))) + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + timeWarp().scheduler().trigger("_id"); + flush(); + refresh(); + + SearchResponse response = searchHistory(searchSource().query(matchQuery("result.input.search.request.indices", indexName))); + assertThat(response.getHits().getTotalHits(), is(1L)); + } + + @Test + public void testDynamicIndexSearchTransform() throws Exception { + final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().now()); + createIndex(indexName); + index(indexName, "type", "1", "key", "value"); + flush(); + refresh(); + + WatcherClient watcherClient = watcherClient(); + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id") + .setSource(watchBuilder() + .trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))) + .transform(searchTransform(new SearchRequest("").types("type"))) + .addAction("log", loggingAction("heya"))) + .get(); + + assertThat(putWatchResponse.isCreated(), is(true)); + + timeWarp().scheduler().trigger("_id"); + flush(); + refresh(); + + SearchResponse response = searchWatchRecords(new Callback() { + @Override + public void handle(SearchRequestBuilder builder) { + builder.setQuery(matchQuery("result.transform.search.request.indices", indexName)); + } + }); + assertThat(response.getHits().getTotalHits(), is(1L)); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/support/DynamicIndexNameTests.java b/src/test/java/org/elasticsearch/watcher/support/DynamicIndexNameTests.java new file mode 100644 index 00000000000..bcaaad3c584 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/support/DynamicIndexNameTests.java @@ -0,0 +1,169 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.*; + +/** + * + */ +public class DynamicIndexNameTests extends ElasticsearchTestCase { + + @Test + public void testNormal() throws Exception { + String indexName = randomAsciiOfLength(10); + DateTime now = DateTime.now(DateTimeZone.UTC); + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + DynamicIndexName indexNames = parser.parse(indexName); + String name = indexNames.name(now); + assertThat(name, equalTo(indexName)); + } + + @Test + public void testExpression() throws Exception { + DateTime now = DateTime.now(DateTimeZone.UTC); + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + DynamicIndexName indexNames = parser.parse("<.marvel-{now}>"); + String name = indexNames.name(now); + assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now))); + } + + @Test + public void testNullOrEmpty() throws Exception { + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + DynamicIndexName indexName = parser.parse((String) null); + assertThat(indexName, nullValue()); + DynamicIndexName[] indexNames = parser.parse(Strings.EMPTY_ARRAY); + assertThat(indexNames, nullValue()); + } + + @Test + public void testExpression_Static() throws Exception { + DateTime now = DateTime.now(DateTimeZone.UTC); + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + DynamicIndexName indexNames = parser.parse("<.marvel-test>"); + String name = indexNames.name(now); + assertThat(name, equalTo(".marvel-test")); + } + + @Test + public void testExpression_MultiParts() throws Exception { + DateTime now = DateTime.now(DateTimeZone.UTC); + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + DynamicIndexName indexNames = parser.parse("<.text1-{now/d}-text2-{now/M}>"); + String name = indexNames.name(now); + assertThat(name, equalTo(".text1-" + + DateTimeFormat.forPattern("YYYY.MM.dd").print(now) + + "-text2-" + + DateTimeFormat.forPattern("YYYY.MM.dd").print(now.withDayOfMonth(1)))); + } + + @Test + public void testExpression_CustomFormat() throws Exception { + DateTime now = DateTime.now(DateTimeZone.UTC); + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + DynamicIndexName indexNames = parser.parse("<.marvel-{now/d{YYYY.MM.dd}}>"); + String name = indexNames.name(now); + assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now))); + } + + @Test + public void testExpression_EscapeStatic() throws Exception { + DateTime now = DateTime.now(DateTimeZone.UTC); + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + DynamicIndexName indexNames = parser.parse("<.mar\\{v\\}el-{now/d}>"); + String name = indexNames.name(now); + assertThat(name, equalTo(".mar{v}el-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now))); + } + + @Test + public void testExpression_EscapeDateFormat() throws Exception { + DateTime now = DateTime.now(DateTimeZone.UTC); + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + DynamicIndexName indexNames = parser.parse("<.marvel-{now/d{'\\{year\\}'YYYY}}>"); + String name = indexNames.name(now); + assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("'{year}'YYYY").print(now))); + } + + @Test + public void testExpression_MixedArray() throws Exception { + DateTime now = DateTime.now(DateTimeZone.UTC); + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + DynamicIndexName[] indexNames = parser.parse(new String[] { + "name1", + "<.marvel-{now/d}>", + "name2", + "<.logstash-{now/M{YYYY.MM}}>" + }); + String[] names = new String[indexNames.length]; + for (int i = 0; i < names.length; i++) { + names[i] = indexNames[i].name(now); + } + assertThat(names.length, is(4)); + assertThat(names, arrayContaining( + "name1", + ".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now), + "name2", + ".logstash-" + DateTimeFormat.forPattern("YYYY.MM").print(now.withDayOfMonth(1)))); + } + + @Test(expected = DynamicIndexName.ParseException.class) + public void testExpression_Invalid_Unescaped() throws Exception { + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + parser.parse("<.mar}vel-{now/d}>"); + } + + @Test(expected = DynamicIndexName.ParseException.class) + public void testExpression_Invalid_DateMathFormat() throws Exception { + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + parser.parse("<.marvel-{now/d{}>"); + } + + @Test(expected = DynamicIndexName.ParseException.class) + public void testExpression_Invalid_EmptyDateMathFormat() throws Exception { + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + parser.parse("<.marvel-{now/d{}}>"); + } + + @Test(expected = DynamicIndexName.ParseException.class) + public void testExpression_Invalid_OpenEnded() throws Exception { + DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd"); + parser.parse("<.marvel-{now/d>"); + } + + public void testDefaultDateFormat_Default() throws Exception { + String dateFormat = DynamicIndexName.defaultDateFormat(Settings.EMPTY); + assertThat(dateFormat, is("YYYY.MM.dd")); + } + + public void testDefaultDateFormat() throws Exception { + Settings settings = Settings.builder() + .put("watcher.dynamic_indices.default_date_format", "YYYY.MM") + .build(); + String dateFormat = randomBoolean() ? + DynamicIndexName.defaultDateFormat(settings) : + DynamicIndexName.defaultDateFormat(settings, null); + assertThat(dateFormat, is("YYYY.MM")); + } + + public void testDefaultDateFormat_Component() throws Exception { + Settings settings = Settings.builder() + .put("watcher.dynamic_indices.default_date_format", "YYYY.MM") + .put("watcher.foo.dynamic_indices.default_date_format", "YYY.MM") + .build(); + String dateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.foo"); + assertThat(dateFormat, is("YYY.MM")); + } +} diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index 5a0613685cd..84c24338b20 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -261,6 +261,10 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg return builder.get().getHits().getTotalHits(); } + protected SearchResponse searchHistory(SearchSourceBuilder builder) { + return client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").setSource(builder.buildAsBytes()).get(); + } + protected T getInstanceFromMaster(Class type) { return internalTestCluster().getInstance(type, internalTestCluster().getMasterName()); } diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java index bd701873523..94652fdd8e4 100644 --- a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java +++ b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java @@ -42,6 +42,7 @@ import org.elasticsearch.watcher.input.search.ExecutableSearchInput; import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput; import org.elasticsearch.watcher.input.simple.SimpleInput; import org.elasticsearch.watcher.license.LicenseService; +import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.support.WatcherUtils; import org.elasticsearch.watcher.support.http.HttpClient; @@ -222,7 +223,7 @@ public final class WatcherTestUtils { new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")), new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger), new ExecutableScriptCondition(new ScriptCondition(Script.inline("return true").build()), logger, scriptService), - new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, client), + new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, client, new DynamicIndexName.Parser()), new TimeValue(0), new ExecutableActions(actions), metadata, diff --git a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java index 023d064d11a..feb5f20e782 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java @@ -5,23 +5,23 @@ */ package org.elasticsearch.watcher.transform.search; +import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Requests; import org.elasticsearch.common.bytes.BytesReference; -import com.google.common.collect.ImmutableMap; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.plugins.PluginsService; -import org.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.watcher.actions.ActionStatus; import org.elasticsearch.watcher.actions.ActionWrapper; import org.elasticsearch.watcher.actions.ExecutableActions; @@ -30,6 +30,7 @@ import org.elasticsearch.watcher.execution.TriggeredExecutionContext; import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput; import org.elasticsearch.watcher.input.simple.SimpleInput; +import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.template.Template; import org.elasticsearch.watcher.transform.Transform; @@ -40,6 +41,8 @@ import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Watch; import org.elasticsearch.watcher.watch.WatchStatus; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; import org.junit.Test; import java.io.IOException; @@ -51,7 +54,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import static org.joda.time.DateTimeZone.UTC; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -61,11 +63,12 @@ import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE; import static org.elasticsearch.watcher.support.WatcherDateTimeUtils.parseDate; import static org.elasticsearch.watcher.test.WatcherTestUtils.*; import static org.hamcrest.Matchers.*; +import static org.joda.time.DateTimeZone.UTC; /** * */ -@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) +@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) public class SearchTransformTests extends ElasticsearchIntegrationTest { @Override @@ -126,7 +129,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .endObject() .endObject()); SearchTransform searchTransform = TransformBuilders.searchTransform(request).build(); - ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client())); + ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); @@ -163,7 +166,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .endObject() .endObject()); SearchTransform searchTransform = TransformBuilders.searchTransform(request).build(); - ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client())); + ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); @@ -209,7 +212,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .must(termQuery("value", "{{ctx.payload.value}}"))))); SearchTransform searchTransform = TransformBuilders.searchTransform(request).build(); - ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client())); + ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00", UTC), parseDate("2015-01-01T00:00:00", UTC)); WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00", UTC), event, EMPTY_PAYLOAD); @@ -269,9 +272,13 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .endObject(); builder.endObject(); + XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); - ExecutableSearchTransform executable = new SearchTransformFactory(Settings.EMPTY, ClientProxy.of(client())).parseExecutable("_id", parser); + + SearchTransformFactory transformFactory = new SearchTransformFactory(Settings.EMPTY, ClientProxy.of(client())); + ExecutableSearchTransform executable = transformFactory.parseExecutable("_id", parser); + assertThat(executable, notNullValue()); assertThat(executable.type(), is(SearchTransform.TYPE)); assertThat(executable.transform().getRequest(), notNullValue()); @@ -287,6 +294,51 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes())); } + @Test + public void testParser_WithIndexNames() throws Exception { + SearchType searchType = getRandomSupportedSearchType(); + XContentBuilder builder = jsonBuilder().startObject(); + builder.array("indices", "idx", ""); + if (searchType != null) { + builder.field("search_type", searchType.name()); + } + + builder.startObject("body") + .startObject("query") + .startObject("match_all") + .endObject() + .endObject() + .endObject(); + + builder.endObject(); + + XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); + parser.nextToken(); + + DynamicIndexName.Parser indexNamesParser = new DynamicIndexName.Parser(); + String dateFormat; + Settings settings; + if (randomBoolean()) { + dateFormat = DynamicIndexName.DEFAULT_DATE_FORMAT; + settings = Settings.EMPTY; + } else { + dateFormat = "YYYY-MM"; + settings = Settings.builder() + .put("watcher.transform.search.dynamic_indices.default_date_format", dateFormat) + .build(); + } + SearchTransformFactory transformFactory = new SearchTransformFactory(settings, ClientProxy.of(client())); + + ExecutableSearchTransform executable = transformFactory.parseExecutable("_id", parser); + DynamicIndexName[] indexNames = executable.indexNames(); + assertThat(indexNames, notNullValue()); + DateTime now = DateTime.now(UTC); + String[] names = DynamicIndexName.names(indexNames, now); + assertThat(names, notNullValue()); + assertThat(names.length, is(2)); + assertThat(names, arrayContaining("idx", "idx-" + DateTimeFormat.forPattern(dateFormat).print(now.minusDays(3)))); + } + @Test(expected = SearchTransformException.class) public void testParser_ScanNotSupported() throws Exception { SearchRequest request = client().prepareSearch() @@ -300,6 +352,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); + DynamicIndexName.Parser indexNamesParser = new DynamicIndexName.Parser(); SearchTransformFactory factory = new SearchTransformFactory(Settings.EMPTY, ClientProxy.of(client())); factory.parseTransform("_id", parser); @@ -319,7 +372,10 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," + "\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" + "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + - "\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"id\":\"" + ctx.id().value() + "\",\"metadata\":null,\"vars\":{},\"watch_id\":\"test-watch\",\"payload\":{},\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}"; + "\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{" + + "\"id\":\"" + ctx.id().value() + "\",\"metadata\":null,\"vars\":{},\"watch_id\":\"test-watch\",\"payload\":{}," + + "\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"}," + + "\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}"; Map params = new HashMap<>(); params.put("seconds_param", "30s"); @@ -348,7 +404,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { "{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," + "\"include_lower\":true,\"include_upper\":true}}}}}}"; - PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache","test-script", templateQuery).request(); + PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache", "test-script", templateQuery).request(); assertThat(client().putIndexedScript(indexedScriptRequest).actionGet().isCreated(), is(true)); Map params = new HashMap<>(); @@ -439,7 +495,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { ensureGreen("test-search-index"); SearchTransform searchTransform = TransformBuilders.searchTransform(request).build(); - ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client())); + ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); return executableSearchTransform.execute(ctx, Payload.Simple.EMPTY); } diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java index 2125698bfc8..750c313c460 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java @@ -6,10 +6,9 @@ package org.elasticsearch.watcher.watch; import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableSet; -import org.elasticsearch.common.bytes.BytesReference; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.joda.time.DateTime; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -59,6 +58,7 @@ import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput; import org.elasticsearch.watcher.input.simple.SimpleInput; import org.elasticsearch.watcher.input.simple.SimpleInputFactory; import org.elasticsearch.watcher.license.LicenseService; +import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.support.WatcherUtils; import org.elasticsearch.watcher.support.clock.Clock; @@ -94,22 +94,24 @@ import org.elasticsearch.watcher.trigger.TriggerEngine; import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.trigger.schedule.*; import org.elasticsearch.watcher.trigger.schedule.support.*; +import org.joda.time.DateTime; import org.junit.Before; import org.junit.Test; import java.util.Collection; import java.util.Map; -import static org.joda.time.DateTimeZone.UTC; import static org.elasticsearch.watcher.input.InputBuilders.searchInput; import static org.elasticsearch.watcher.test.WatcherTestUtils.matchAllRequest; import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; import static org.hamcrest.Matchers.*; +import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.mock; public class WatchTests extends ElasticsearchTestCase { private ScriptServiceProxy scriptService; + private DynamicIndexName.Parser indexNamesParser; private ClientProxy client; private HttpClient httpClient; private EmailService emailService; @@ -119,11 +121,13 @@ public class WatchTests extends ElasticsearchTestCase { private SecretService secretService; private LicenseService licenseService; private ESLogger logger; + private DynamicIndexName.Parser indexNameParser; private Settings settings = Settings.EMPTY; @Before public void init() throws Exception { scriptService = mock(ScriptServiceProxy.class); + indexNamesParser = new DynamicIndexName.Parser(); client = mock(ClientProxy.class); httpClient = mock(HttpClient.class); emailService = mock(EmailService.class); @@ -133,9 +137,10 @@ public class WatchTests extends ElasticsearchTestCase { licenseService = mock(LicenseService.class); authRegistry = new HttpAuthRegistry(ImmutableMap.of("basic", (HttpAuthFactory) new BasicAuthFactory(secretService))); logger = Loggers.getLogger(WatchTests.class); + indexNameParser = new DynamicIndexName.Parser(); } - @Test //@Repeat(iterations = 20) + @Test public void testParser_SelfGenerated() throws Exception { DateTime now = new DateTime(UTC); ClockMock clock = new ClockMock(); @@ -307,7 +312,7 @@ public class WatchTests extends ElasticsearchTestCase { switch (type) { case SearchInput.TYPE: SearchInput searchInput = searchInput(WatcherTestUtils.newInputSearchRequest("idx")).build(); - return new ExecutableSearchInput(searchInput, logger, client); + return new ExecutableSearchInput(searchInput, logger, client, indexNameParser); default: SimpleInput simpleInput = InputBuilders.simpleInput(ImmutableMap.builder().put("_key", "_val")).build(); return new ExecutableSimpleInput(simpleInput, logger); @@ -359,13 +364,13 @@ public class WatchTests extends ElasticsearchTestCase { case ScriptTransform.TYPE: return new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService); case SearchTransform.TYPE: - return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client); + return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client, indexNameParser); default: // chain ChainTransform chainTransform = new ChainTransform(ImmutableList.of( new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), new ScriptTransform(Script.inline("_script").build()))); return new ExecutableChainTransform(chainTransform, logger, ImmutableList.of( - new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client), + new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client, indexNameParser), new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService))); } } @@ -389,8 +394,8 @@ public class WatchTests extends ElasticsearchTestCase { list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), randomThrottler(), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer))); } if (randomBoolean()) { - IndexAction aciton = new IndexAction("_index", "_type", null); - list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(aciton, logger, client))); + IndexAction action = new IndexAction("_index", "_type", null); + list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(action, logger, client, indexNameParser))); } if (randomBoolean()) { HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000)) @@ -411,7 +416,7 @@ public class WatchTests extends ElasticsearchTestCase { parsers.put(EmailAction.TYPE, new EmailActionFactory(settings, emailService, templateEngine, htmlSanitizer)); break; case IndexAction.TYPE: - parsers.put(IndexAction.TYPE, new IndexActionFactory(settings, client)); + parsers.put(IndexAction.TYPE, new IndexActionFactory(settings, client, indexNamesParser)); break; case WebhookAction.TYPE: parsers.put(WebhookAction.TYPE, new WebhookActionFactory(settings, httpClient,