Added Dynamic Index Names Support

The search input/transform rely on users configuring the search requests. Sometimes (often), these search requests are executed on time-based indices. The problem the

Until now, there's no way to define dynamic index names that are bound to time, which forces the request to search all the indices (for example, the Marvel watches se

This commit adds dynamic index name resolution. It works in the following way:

- and index name can be a simple string (indicating the static/absolute index name you're searching, incl. wildcards)
- an index name can also be a template. The template is enclosed within `<` and `>` (these are officially illegal characters for index names, so these are safe to use)
- the template can have both static parts to it and place holder parts. The place holders are enclosed within `{` and `}`. The place holder holds `date math` expression
 * `"<.marvel-{now}>"` will resolve to `".marvel-2022.03.03"` (the default date format is `YYYY.MM.dd`)
 * `"<.marvel-{now/M}>"` will resolve to `".marvel-2022.03.01"`
 * `"<.marvel-{now{YYYY.MM}}>"` will resolve to `".marvel-2022.03"` (this one has a custom date format - `YYYY.MM`)
 * `"<.marvel-{now/M-1M{YYYY.MM}}>"` will resolve to `".marvel-2022.02"`

The following is an example of a search input that searches marvel indices for the last 3 days (relies on the default Marvel indices format - `.marvel-YYYY.MM.dd`):

```
{
    ...
    "input" : {
            "search" : {
                    "request" : {
                            "indices" : [
                                    "<.marvel-{now/d-2d}>",
                                    "<.marvel-{now/d-1d}>",
                                    "<.marvel-{now/d}>"
                            ],
                            ...
                    }
            }
    }
    ...
}
```

- `index` action was also updated to work with a dynamic index name (e.g. it's possible to index into daily indices by setting the index name to `<idx-{now}>`)

Original commit: elastic/x-pack-elasticsearch@9c15a96029
This commit is contained in:
uboness 2015-05-16 00:55:23 +02:00
parent d2d1c44d07
commit 71aa3a8059
21 changed files with 878 additions and 75 deletions

View File

@ -19,6 +19,7 @@ import org.elasticsearch.watcher.input.InputModule;
import org.elasticsearch.watcher.license.LicenseModule;
import org.elasticsearch.watcher.rest.WatcherRestModule;
import org.elasticsearch.watcher.shield.WatcherShieldModule;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.clock.ClockModule;
import org.elasticsearch.watcher.support.http.HttpClientModule;
@ -70,6 +71,7 @@ public class WatcherModule extends AbstractModule implements SpawnModules {
bind(WatcherLifeCycleService.class).asEagerSingleton();
bind(TemplateUtils.class).asEagerSingleton();
bind(WatcherSettingsValidation.class).asEagerSingleton();
bind(DynamicIndexName.Parser.class).asEagerSingleton();
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.ExecutableAction;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.ArrayObjectIterator;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
@ -32,10 +33,16 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
private final ClientProxy client;
private final DynamicIndexName indexName;
public ExecutableIndexAction(IndexAction action, ESLogger logger, ClientProxy client) {
public ExecutableIndexAction(IndexAction action, ESLogger logger, ClientProxy client, DynamicIndexName.Parser indexNameParser) {
super(action, logger);
this.client = client;
this.indexName = indexNameParser.parse(action.index);
}
DynamicIndexName indexName() {
return indexName;
}
@Override
@ -57,7 +64,8 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.index(indexName.name(ctx.executionTime()));
indexRequest.type(action.docType);
if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) {
@ -72,7 +80,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
indexRequest.source(jsonBuilder().prettyPrint().map(data));
if (ctx.simulateAction(actionId)) {
return new IndexAction.Result.Simulated(action.index, action.docType, new XContentSource(indexRequest.source(), XContentType.JSON));
return new IndexAction.Result.Simulated(indexRequest.index(), action.docType, new XContentSource(indexRequest.source(), XContentType.JSON));
}
IndexResponse response = client.index(indexRequest);
@ -89,7 +97,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
Map<String, Object> doc = (Map<String, Object>) item;
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.index(indexName.name(ctx.executionTime()));
indexRequest.type(action.docType);
if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) {
if (!(doc instanceof HashMap)) {

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import java.io.IOException;
@ -91,10 +92,14 @@ public class IndexAction implements Action {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (Field.INDEX.match(currentFieldName)) {
} else if (Field.INDEX.match(currentFieldName)) {
try {
index = parser.text();
} else if (Field.DOC_TYPE.match(currentFieldName)) {
} catch (DynamicIndexName.ParseException pe) {
throw new IndexActionException("could not parse [{}] action [{}/{}]. failed to parse index name value for field [{}]", pe, TYPE, watchId, actionId, currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (Field.DOC_TYPE.match(currentFieldName)) {
docType = parser.text();
} else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName)) {
executionTimeField = parser.text();

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.actions.ActionFactory;
import org.elasticsearch.watcher.actions.email.ExecutableEmailAction;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import java.io.IOException;
@ -21,11 +22,14 @@ import java.io.IOException;
public class IndexActionFactory extends ActionFactory<IndexAction, ExecutableIndexAction> {
private final ClientProxy client;
private final DynamicIndexName.Parser indexNamesParser;
@Inject
public IndexActionFactory(Settings settings, ClientProxy client) {
public IndexActionFactory(Settings settings, ClientProxy client, DynamicIndexName.Parser indexNamesParser) {
super(Loggers.getLogger(ExecutableEmailAction.class, settings));
this.client = client;
String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.actions.index");
this.indexNamesParser = new DynamicIndexName.Parser(defaultDateFormat);
}
@Override
@ -40,6 +44,6 @@ public class IndexActionFactory extends ActionFactory<IndexAction, ExecutableInd
@Override
public ExecutableIndexAction createExecutable(IndexAction action) {
return new ExecutableIndexAction(action, actionLogger, client);
return new ExecutableIndexAction(action, actionLogger, client, indexNamesParser);
}
}

View File

@ -36,7 +36,7 @@ public abstract class InputFactory<I extends Input, R extends Input.Result, E ex
*/
public abstract E createExecutable(I input);
public ExecutableInput parseExecutable(String watchId, XContentParser parser) throws IOException {
public E parseExecutable(String watchId, XContentParser parser) throws IOException {
I input = parseInput(watchId, parser);
return createExecutable(input);
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.input.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -16,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
@ -33,16 +35,23 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
private final ClientProxy client;
private final @Nullable DynamicIndexName[] indexNames;
public ExecutableSearchInput(SearchInput input, ESLogger logger, ClientProxy client) {
public ExecutableSearchInput(SearchInput input, ESLogger logger, ClientProxy client, DynamicIndexName.Parser indexNameParser) {
super(input, logger);
this.client = client;
String[] indices = input.getSearchRequest().indices();
indexNames = indices != null ? indexNameParser.parse(indices) : null;
}
DynamicIndexName[] indexNames() {
return indexNames;
}
public SearchInput.Result execute(WatchExecutionContext ctx) {
SearchRequest request = null;
try {
request = WatcherUtils.createSearchRequestFromPrototype(input.getSearchRequest(), ctx, null);
request = WatcherUtils.createSearchRequestFromPrototype(input.getSearchRequest(), indexNames, ctx, null);
return doExecute(ctx, request);
} catch (Exception e) {
logger.error("failed to execute [{}] input for [{}]", e, SearchInput.TYPE, ctx.watch());
@ -51,7 +60,6 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
}
SearchInput.Result doExecute(WatchExecutionContext ctx, SearchRequest request) throws Exception {
if (logger.isTraceEnabled()) {
BytesReference source = request.source() != null ? request.source() : request.templateSource();
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), XContentHelper.convertToJson(source, false, true));

View File

@ -116,7 +116,6 @@ public class SearchInput implements Input {
if (request == null) {
throw new SearchInputException("could not parse [{}] input for watch [{}]. missing required [{}] field", TYPE, watchId, Field.REQUEST.getPreferredName());
}
return new SearchInput(request, extract);
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.InputFactory;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import java.io.IOException;
@ -21,11 +22,14 @@ import java.io.IOException;
public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Result, ExecutableSearchInput> {
private final ClientProxy client;
private final DynamicIndexName.Parser indexNameParser;
@Inject
public SearchInputFactory(Settings settings, ClientProxy client) {
super(Loggers.getLogger(ExecutableSimpleInput.class, settings));
this.client = client;
String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.input.search");
this.indexNameParser = new DynamicIndexName.Parser(defaultDateFormat);
}
@Override
@ -40,6 +44,6 @@ public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Re
@Override
public ExecutableSearchInput createExecutable(SearchInput input) {
return new ExecutableSearchInput(input, inputLogger, client);
return new ExecutableSearchInput(input, inputLogger, client, indexNameParser);
}
}

View File

@ -0,0 +1,317 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.WatcherException;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
/**
*
*/
public class DynamicIndexName implements ToXContent {
public static final String DEFAULT_DATE_FORMAT = "YYYY.MM.dd";
private static final String EXPRESSION_LEFT_BOUND = "<";
private static final String EXPRESSION_RIGHT_BOUND = ">";
private static final char LEFT_BOUND = '{';
private static final char RIGHT_BOUND = '}';
private static final char ESCAPE_CHAR = '\\';
private final String text;
private final Expression expression;
private DynamicIndexName(String text, Expression expression) {
this.text = text;
this.expression = expression;
}
public String text() {
return text;
}
public String name(DateTime now) {
return expression.eval(now);
}
public static String[] names(DynamicIndexName[] indexNames, DateTime now) {
String[] names = new String[indexNames.length];
for (int i = 0; i < names.length; i++) {
names[i] = indexNames[i].name(now);
}
return names;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DynamicIndexName that = (DynamicIndexName) o;
return text.equals(that.text);
}
@Override
public int hashCode() {
return text.hashCode();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.value(text);
}
public static String defaultDateFormat(Settings settings, String componentPrefix) {
if (componentPrefix == null) {
return defaultDateFormat(settings);
}
return settings.get(componentPrefix + ".dynamic_indices.default_date_format", defaultDateFormat(settings));
}
public static String defaultDateFormat(Settings settings) {
return settings.get("watcher.dynamic_indices.default_date_format", DEFAULT_DATE_FORMAT);
}
interface Expression {
String eval(DateTime now);
}
static class StaticExpression implements Expression {
private final String value;
public StaticExpression(String value) {
this.value = value;
}
@Override
public String eval(DateTime now) {
return value;
}
}
static class CompoundExpression implements Expression {
private final Expression[] parts;
public CompoundExpression(Expression[] parts) {
this.parts = parts;
}
@Override
public String eval(DateTime now) {
StringBuilder sb = new StringBuilder();
for (Expression part : parts) {
sb.append(part.eval(now));
}
return sb.toString();
}
static Expression parse(String defaultDateFormat, char[] text, int from, int length) {
boolean dynamic = false;
List<Expression> expressions = new ArrayList<>();
StringBuilder sb = new StringBuilder();
boolean inPlaceHolder = false;
boolean inDateFormat = false;
boolean escape = false;
for (int i = from; i < length; i++) {
boolean escapedChar = escape;
if (escape) {
escape = false;
}
char c = text[i];
if (c == ESCAPE_CHAR) {
if (escapedChar) {
sb.append(c);
escape = false;
} else {
escape = true;
}
continue;
}
if (inPlaceHolder) {
switch (c) {
case LEFT_BOUND:
if (inDateFormat && escapedChar) {
sb.append(c);
} else if (!inDateFormat) {
inDateFormat = true;
sb.append(c);
} else {
throw new ParseException("invalid dynamic name expression [{}]. invalid character in placeholder at position [{}]", new String(text, from, length), i);
}
break;
case RIGHT_BOUND:
if (inDateFormat && escapedChar) {
sb.append(c);
} else if (inDateFormat) {
inDateFormat = false;
sb.append(c);
} else {
expressions.add(new DateMathExpression(defaultDateFormat, sb.toString()));
sb = new StringBuilder();
inPlaceHolder = false;
dynamic = true;
}
break;
default:
sb.append(c);
}
} else {
switch (c) {
case LEFT_BOUND:
if (escapedChar) {
sb.append(c);
} else {
expressions.add(new StaticExpression(sb.toString()));
sb = new StringBuilder();
inPlaceHolder = true;
}
break;
case RIGHT_BOUND:
if (!escapedChar) {
throw new ParseException("invalid dynamic name expression [{}]. invalid character at position [{}]. " +
"`{` and `}` are reserved characters and should be escaped when used as part of the index name using `\\` (e.g. `\\{text\\}`)", new String(text, from, length), i);
}
default:
sb.append(c);
}
}
}
if (inPlaceHolder) {
throw new ParseException("invalid dynamic name expression [{}]. date math placeholder is open ended", new String(text, from, length));
}
if (sb.length() > 0) {
expressions.add(new StaticExpression(sb.toString()));
}
if (!dynamic) {
// if all the expressions are static... lets optimize to a single static expression
sb = new StringBuilder();
for (Expression expression : expressions) {
sb.append(((StaticExpression) expression).value);
}
return new StaticExpression(sb.toString());
}
if (expressions.size() == 1) {
return expressions.get(0);
}
return new CompoundExpression(expressions.toArray(new Expression[expressions.size()]));
}
}
static class DateMathExpression implements Expression {
private final DateMathParser dateMathParser;
private final String mathExpression;
private final FormatDateTimeFormatter formatter;
public DateMathExpression(String defaultFormat, String expression) {
int i = expression.indexOf(LEFT_BOUND);
if (i < 0) {
mathExpression = expression;
formatter = Joda.forPattern(defaultFormat);
} else {
if (expression.lastIndexOf(RIGHT_BOUND) != expression.length() - 1) {
throw new ParseException("invalid dynamic name expression [{}]. missing closing `}` for date math format", expression);
}
if (i == expression.length() - 2) {
throw new ParseException("invalid dynamic name expression [{}]. missing date format", expression);
}
mathExpression = expression.substring(0, i);
formatter = Joda.forPattern(expression.substring(i + 1, expression.length() - 1));
}
dateMathParser = new DateMathParser(formatter);
}
@Override
public String eval(final DateTime now) {
long millis = dateMathParser.parse(mathExpression, new Callable<Long>() {
@Override
public Long call() throws Exception {
return now.getMillis();
}
});
return formatter.printer().print(millis);
}
}
public static class Parser {
private final String defaultDateFormat;
public Parser() {
this(DEFAULT_DATE_FORMAT);
}
public Parser(String defaultDateFormat) {
this.defaultDateFormat = defaultDateFormat;
}
public DynamicIndexName parse(String template) {
if (template == null) {
return null;
}
if (!template.startsWith(EXPRESSION_LEFT_BOUND) || !template.endsWith(EXPRESSION_RIGHT_BOUND)) {
return new DynamicIndexName(template, new StaticExpression(template));
}
return new DynamicIndexName(template, CompoundExpression.parse(defaultDateFormat, template.toCharArray(), 1, template.length() - 1));
}
public DynamicIndexName[] parse(String[] templates) {
if (templates.length == 0) {
return null;
}
DynamicIndexName[] dynamicIndexNames = new DynamicIndexName[templates.length];
for (int i = 0; i < dynamicIndexNames.length; i++) {
dynamicIndexNames[i] = parse(templates[i]);
}
return dynamicIndexNames;
}
public DynamicIndexName parse(XContentParser parser) throws IOException {
if (parser.currentToken() != XContentParser.Token.VALUE_STRING) {
throw new ParseException("could not parse index name. expected a string value but found [{}] instead", parser.currentToken());
}
return parse(parser.text());
}
}
public static class ParseException extends WatcherException {
public ParseException(String msg, Object... args) {
super(msg, args);
}
public ParseException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.support;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
@ -55,11 +56,16 @@ public final class WatcherUtils {
}
}
public static SearchRequest createSearchRequestFromPrototype(SearchRequest requestPrototype, WatchExecutionContext ctx, Payload payload) throws IOException {
public static SearchRequest createSearchRequestFromPrototype(SearchRequest requestPrototype, @Nullable DynamicIndexName[] dynamicIndexNames, WatchExecutionContext ctx, Payload payload) throws IOException {
String[] indices = dynamicIndexNames == null ?
requestPrototype.indices() :
DynamicIndexName.names(dynamicIndexNames, ctx.executionTime());
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.searchType(requestPrototype.searchType())
.indices(requestPrototype.indices())
.indices(indices)
.types(requestPrototype.types());
// TODO: Revise this search template conversion code once search templates in core have been refactored once ES 2.0 is released.

View File

@ -8,8 +8,10 @@ package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.transform.ExecutableTransform;
@ -23,17 +25,24 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
protected final ClientProxy client;
private final @Nullable DynamicIndexName[] indexNames;
public ExecutableSearchTransform(SearchTransform transform, ESLogger logger, ClientProxy client) {
public ExecutableSearchTransform(SearchTransform transform, ESLogger logger, ClientProxy client, DynamicIndexName.Parser indexNameParser) {
super(transform, logger);
this.client = client;
String[] indices = transform.request.indices();
this.indexNames = indices != null ? indexNameParser.parse(indices) : null;
}
DynamicIndexName[] indexNames() {
return indexNames;
}
@Override
public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload) {
SearchRequest request = null;
try {
request = WatcherUtils.createSearchRequestFromPrototype(transform.request, ctx, payload);
request = WatcherUtils.createSearchRequestFromPrototype(transform.request, indexNames, ctx, payload);
SearchResponse resp = client.search(request);
return new SearchTransform.Result(request, new Payload.XContent(resp));
} catch (Exception e) {
@ -41,5 +50,4 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
return new SearchTransform.Result(request, e);
}
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.transform.TransformFactory;
@ -20,11 +21,14 @@ import java.io.IOException;
public class SearchTransformFactory extends TransformFactory<SearchTransform, SearchTransform.Result, ExecutableSearchTransform> {
protected final ClientProxy client;
protected final DynamicIndexName.Parser indexNameParser;
@Inject
public SearchTransformFactory(Settings settings, ClientProxy client) {
super(Loggers.getLogger(ExecutableSearchTransform.class, settings));
this.client = client;
String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.transform.search");
this.indexNameParser = new DynamicIndexName.Parser(defaultDateFormat);
}
@Override
@ -39,6 +43,6 @@ public class SearchTransformFactory extends TransformFactory<SearchTransform, Se
@Override
public ExecutableSearchTransform createExecutable(SearchTransform transform) {
return new ExecutableSearchTransform(transform, transformLogger, client);
return new ExecutableSearchTransform(transform, transformLogger, client, indexNameParser);
}
}

View File

@ -203,13 +203,19 @@
"type" : "object",
"dynamic" : true,
"properties" : {
"indices" : {
"type" : "string",
"index" : "not_analyzed"
},
"types" : {
"type" : "string",
"index" : "not_analyzed"
"request" : {
"type" : "object",
"dynamic" : true,
"properties" : {
"indices" : {
"type" : "string",
"index" : "not_analyzed"
},
"types" : {
"type" : "string",
"index" : "not_analyzed"
}
}
}
}
}

View File

@ -21,13 +21,14 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.Action.Result.Status;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.watch.Payload;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.junit.Test;
import java.util.Map;
@ -37,6 +38,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.hamcrest.Matchers.*;
import static org.joda.time.DateTimeZone.UTC;
/**
*/
@ -71,8 +73,8 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
}
IndexAction action = new IndexAction("test-index", "test-type", timestampField);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()));
DateTime executionTime = DateTime.now(DateTimeZone.UTC);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
DateTime executionTime = DateTime.now(UTC);
Payload payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", ImmutableMap.of("foo", "bar"));
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload);
@ -134,8 +136,8 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
);
IndexAction action = new IndexAction("test-index", "test-type", timestampField);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()));
DateTime executionTime = DateTime.now(DateTimeZone.UTC);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
DateTime executionTime = DateTime.now(UTC);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, new Payload.Simple("_doc", list));
Action.Result result = executable.execute("_id", ctx, ctx.payload());
@ -197,7 +199,8 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client()));
DynamicIndexName.Parser indexNameParser = new DynamicIndexName.Parser();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client()), indexNameParser);
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
@ -210,6 +213,27 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
}
}
@Test
public void testParser_DynamicIndex() throws Exception {
XContentBuilder builder = jsonBuilder();
builder.startObject()
.field(IndexAction.Field.INDEX.getPreferredName(), "<idx-{now/d}>")
.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type")
.endObject();
DynamicIndexName.Parser indexNameParser = new DynamicIndexName.Parser();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client()), indexNameParser);
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
ExecutableIndexAction executable = actionParser.parseExecutable(randomAsciiOfLength(5), randomAsciiOfLength(3), parser);
DateTime now = DateTime.now(UTC);
assertThat(executable, notNullValue());
assertThat(executable.action().index, is("<idx-{now/d}>"));
assertThat(executable.indexName().name(now), is("idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now)));
}
@Test
public void testParser_Failure() throws Exception {
XContentBuilder builder = jsonBuilder();
@ -225,7 +249,8 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
}
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client()));
DynamicIndexName.Parser indexNameParser = new DynamicIndexName.Parser();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client()), indexNameParser);
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
try {
@ -237,5 +262,4 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
assertThat(useIndex && useType, equalTo(false));
}
}
}

View File

@ -5,21 +5,22 @@
*/
package org.elasticsearch.watcher.input.search;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.plugins.PluginsService;
import org.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.watcher.actions.ActionStatus;
import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.actions.ExecutableActions;
@ -28,6 +29,7 @@ import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
@ -36,7 +38,8 @@ import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchStatus;
import org.joda.time.DateTimeZone;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.junit.Test;
import java.io.IOException;
@ -51,18 +54,17 @@ import java.util.Map;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.elasticsearch.watcher.test.WatcherTestUtils.areJsonEquivalent;
import static org.elasticsearch.watcher.test.WatcherTestUtils.getRandomSupportedSearchType;
import static org.hamcrest.Matchers.*;
import static org.joda.time.DateTimeZone.UTC;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class SearchInputTests extends ElasticsearchIntegrationTest {
private final static String TEMPLATE_QUERY = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
@ -113,7 +115,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.request()
.source(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()));
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
@ -124,8 +126,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
new ExecutableActions(new ArrayList<ActionWrapper>()),
null,
new WatchStatus(ImmutableMap.<String, ActionStatus>of())),
new DateTime(0, DateTimeZone.UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)),
new DateTime(0, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)),
timeValueSeconds(5));
SearchInput.Result result = searchInput.execute(ctx);
@ -220,7 +222,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.request()
.source(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()));
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
@ -231,8 +233,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
new ExecutableActions(new ArrayList<ActionWrapper>()),
null,
new WatchStatus(ImmutableMap.<String, ActionStatus>of())),
new DateTime(0, DateTimeZone.UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)),
new DateTime(0, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)),
timeValueSeconds(5));
SearchInput.Result result = searchInput.execute(ctx);
@ -261,6 +263,43 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
assertEquals(SearchInput.TYPE, searchInput.type());
}
@Test
public void testParser_IndexNames() throws Exception {
SearchRequest request = client().prepareSearch()
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
.setIndices("test", "<test-{now/M-1M}>")
.request()
.source(searchSource()
.query(boolQuery().must(matchQuery("event_type", "a")).filter(rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null));
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
String dateFormat;
Settings settings;
if (randomBoolean()) {
dateFormat = DynamicIndexName.DEFAULT_DATE_FORMAT;
settings = Settings.EMPTY;
} else {
dateFormat = "YYYY-MM";
settings = Settings.builder()
.put("watcher.input.search.dynamic_indices.default_date_format", dateFormat)
.build();
}
SearchInputFactory factory = new SearchInputFactory(settings, ClientProxy.of(client()));
ExecutableSearchInput executable = factory.parseExecutable("_id", parser);
DynamicIndexName[] indexNames = executable.indexNames();
assertThat(indexNames, notNullValue());
DateTime now = DateTime.now(UTC);
String[] names = DynamicIndexName.names(indexNames, now);
assertThat(names, notNullValue());
assertThat(names.length, is(2));
assertThat(names, arrayContaining("test", "test-" + DateTimeFormat.forPattern(dateFormat).print(now.withDayOfMonth(1).minusMonths(1))));
}
@Test(expected = SearchInputException.class)
public void testParser_ScanNotSupported() throws Exception {
SearchRequest request = client().prepareSearch()
@ -290,8 +329,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
new ExecutableActions(new ArrayList<ActionWrapper>()),
null,
new WatchStatus(ImmutableMap.<String, ActionStatus>of())),
new DateTime(60000, DateTimeZone.UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(60000, DateTimeZone.UTC), new DateTime(60000, DateTimeZone.UTC)),
new DateTime(60000, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(60000, UTC), new DateTime(60000, UTC)),
timeValueSeconds(5));
}
@ -302,7 +341,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
SearchInput si = siBuilder.build();
ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()));
ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
return searchInput.execute(ctx);
}

View File

@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.joda.time.format.DateTimeFormat;
import org.junit.Test;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition;
import static org.elasticsearch.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.watcher.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.is;
/**
*/
public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegrationTests {
@Override
protected boolean timeWarped() {
return true;
}
@Override
protected boolean enableShield() {
return false; // reduce noise
}
@Test
public void testDynamicIndexAction() throws Exception {
WatcherClient watcherClient = watcherClient();
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(simpleInput("key", "value"))
.condition(alwaysCondition())
.addAction("dynamic_index", indexAction("<idx-{now}>", "type")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
timeWarp().scheduler().trigger("_id");
refresh();
assertWatchWithMinimumPerformedActionsCount("_id", 1, false);
final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().now());
logger.info("checking index [{}]", indexName);
assertBusy(new Runnable() {
@Override
public void run() {
flush();
refresh();
long docCount = docCount(indexName, "type", matchAllQuery());
assertThat(docCount, is(1L));
}
});
}
@Test
public void testDynamicIndexSearchInput() throws Exception {
final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().now());
createIndex(indexName);
index(indexName, "type", "1", "key", "value");
flush();
refresh();
WatcherClient watcherClient = watcherClient();
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(new SearchRequest("<idx-{now/d}>").types("type"))))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
timeWarp().scheduler().trigger("_id");
flush();
refresh();
SearchResponse response = searchHistory(searchSource().query(matchQuery("result.input.search.request.indices", indexName)));
assertThat(response.getHits().getTotalHits(), is(1L));
}
@Test
public void testDynamicIndexSearchTransform() throws Exception {
final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().now());
createIndex(indexName);
index(indexName, "type", "1", "key", "value");
flush();
refresh();
WatcherClient watcherClient = watcherClient();
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id")
.setSource(watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.transform(searchTransform(new SearchRequest("<idx-{now/d}>").types("type")))
.addAction("log", loggingAction("heya")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
timeWarp().scheduler().trigger("_id");
flush();
refresh();
SearchResponse response = searchWatchRecords(new Callback<SearchRequestBuilder>() {
@Override
public void handle(SearchRequestBuilder builder) {
builder.setQuery(matchQuery("result.transform.search.request.indices", indexName));
}
});
assertThat(response.getHits().getTotalHits(), is(1L));
}
}

View File

@ -0,0 +1,169 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.*;
/**
*
*/
public class DynamicIndexNameTests extends ElasticsearchTestCase {
@Test
public void testNormal() throws Exception {
String indexName = randomAsciiOfLength(10);
DateTime now = DateTime.now(DateTimeZone.UTC);
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
DynamicIndexName indexNames = parser.parse(indexName);
String name = indexNames.name(now);
assertThat(name, equalTo(indexName));
}
@Test
public void testExpression() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
DynamicIndexName indexNames = parser.parse("<.marvel-{now}>");
String name = indexNames.name(now);
assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now)));
}
@Test
public void testNullOrEmpty() throws Exception {
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
DynamicIndexName indexName = parser.parse((String) null);
assertThat(indexName, nullValue());
DynamicIndexName[] indexNames = parser.parse(Strings.EMPTY_ARRAY);
assertThat(indexNames, nullValue());
}
@Test
public void testExpression_Static() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
DynamicIndexName indexNames = parser.parse("<.marvel-test>");
String name = indexNames.name(now);
assertThat(name, equalTo(".marvel-test"));
}
@Test
public void testExpression_MultiParts() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
DynamicIndexName indexNames = parser.parse("<.text1-{now/d}-text2-{now/M}>");
String name = indexNames.name(now);
assertThat(name, equalTo(".text1-"
+ DateTimeFormat.forPattern("YYYY.MM.dd").print(now)
+ "-text2-"
+ DateTimeFormat.forPattern("YYYY.MM.dd").print(now.withDayOfMonth(1))));
}
@Test
public void testExpression_CustomFormat() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
DynamicIndexName indexNames = parser.parse("<.marvel-{now/d{YYYY.MM.dd}}>");
String name = indexNames.name(now);
assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now)));
}
@Test
public void testExpression_EscapeStatic() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
DynamicIndexName indexNames = parser.parse("<.mar\\{v\\}el-{now/d}>");
String name = indexNames.name(now);
assertThat(name, equalTo(".mar{v}el-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now)));
}
@Test
public void testExpression_EscapeDateFormat() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
DynamicIndexName indexNames = parser.parse("<.marvel-{now/d{'\\{year\\}'YYYY}}>");
String name = indexNames.name(now);
assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("'{year}'YYYY").print(now)));
}
@Test
public void testExpression_MixedArray() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
DynamicIndexName[] indexNames = parser.parse(new String[] {
"name1",
"<.marvel-{now/d}>",
"name2",
"<.logstash-{now/M{YYYY.MM}}>"
});
String[] names = new String[indexNames.length];
for (int i = 0; i < names.length; i++) {
names[i] = indexNames[i].name(now);
}
assertThat(names.length, is(4));
assertThat(names, arrayContaining(
"name1",
".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now),
"name2",
".logstash-" + DateTimeFormat.forPattern("YYYY.MM").print(now.withDayOfMonth(1))));
}
@Test(expected = DynamicIndexName.ParseException.class)
public void testExpression_Invalid_Unescaped() throws Exception {
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
parser.parse("<.mar}vel-{now/d}>");
}
@Test(expected = DynamicIndexName.ParseException.class)
public void testExpression_Invalid_DateMathFormat() throws Exception {
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
parser.parse("<.marvel-{now/d{}>");
}
@Test(expected = DynamicIndexName.ParseException.class)
public void testExpression_Invalid_EmptyDateMathFormat() throws Exception {
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
parser.parse("<.marvel-{now/d{}}>");
}
@Test(expected = DynamicIndexName.ParseException.class)
public void testExpression_Invalid_OpenEnded() throws Exception {
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
parser.parse("<.marvel-{now/d>");
}
public void testDefaultDateFormat_Default() throws Exception {
String dateFormat = DynamicIndexName.defaultDateFormat(Settings.EMPTY);
assertThat(dateFormat, is("YYYY.MM.dd"));
}
public void testDefaultDateFormat() throws Exception {
Settings settings = Settings.builder()
.put("watcher.dynamic_indices.default_date_format", "YYYY.MM")
.build();
String dateFormat = randomBoolean() ?
DynamicIndexName.defaultDateFormat(settings) :
DynamicIndexName.defaultDateFormat(settings, null);
assertThat(dateFormat, is("YYYY.MM"));
}
public void testDefaultDateFormat_Component() throws Exception {
Settings settings = Settings.builder()
.put("watcher.dynamic_indices.default_date_format", "YYYY.MM")
.put("watcher.foo.dynamic_indices.default_date_format", "YYY.MM")
.build();
String dateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.foo");
assertThat(dateFormat, is("YYY.MM"));
}
}

View File

@ -261,6 +261,10 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return builder.get().getHits().getTotalHits();
}
protected SearchResponse searchHistory(SearchSourceBuilder builder) {
return client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").setSource(builder.buildAsBytes()).get();
}
protected <T> T getInstanceFromMaster(Class<T> type) {
return internalTestCluster().getInstance(type, internalTestCluster().getMasterName());
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.http.HttpClient;
@ -222,7 +223,7 @@ public final class WatcherTestUtils {
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger),
new ExecutableScriptCondition(new ScriptCondition(Script.inline("return true").build()), logger, scriptService),
new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, client),
new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, client, new DynamicIndexName.Parser()),
new TimeValue(0),
new ExecutableActions(actions),
metadata,

View File

@ -5,23 +5,23 @@
*/
package org.elasticsearch.watcher.transform.search;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.plugins.PluginsService;
import org.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.watcher.actions.ActionStatus;
import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.actions.ExecutableActions;
@ -30,6 +30,7 @@ import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.transform.Transform;
@ -40,6 +41,8 @@ import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.junit.Test;
import java.io.IOException;
@ -51,7 +54,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -61,11 +63,12 @@ import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.elasticsearch.watcher.support.WatcherDateTimeUtils.parseDate;
import static org.elasticsearch.watcher.test.WatcherTestUtils.*;
import static org.hamcrest.Matchers.*;
import static org.joda.time.DateTimeZone.UTC;
/**
*
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class SearchTransformTests extends ElasticsearchIntegrationTest {
@Override
@ -126,7 +129,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.endObject()
.endObject());
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
@ -163,7 +166,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.endObject()
.endObject());
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
@ -209,7 +212,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.must(termQuery("value", "{{ctx.payload.value}}")))));
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00", UTC), parseDate("2015-01-01T00:00:00", UTC));
WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00", UTC), event, EMPTY_PAYLOAD);
@ -269,9 +272,13 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.endObject();
builder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
ExecutableSearchTransform executable = new SearchTransformFactory(Settings.EMPTY, ClientProxy.of(client())).parseExecutable("_id", parser);
SearchTransformFactory transformFactory = new SearchTransformFactory(Settings.EMPTY, ClientProxy.of(client()));
ExecutableSearchTransform executable = transformFactory.parseExecutable("_id", parser);
assertThat(executable, notNullValue());
assertThat(executable.type(), is(SearchTransform.TYPE));
assertThat(executable.transform().getRequest(), notNullValue());
@ -287,6 +294,51 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes()));
}
@Test
public void testParser_WithIndexNames() throws Exception {
SearchType searchType = getRandomSupportedSearchType();
XContentBuilder builder = jsonBuilder().startObject();
builder.array("indices", "idx", "<idx-{now/d-3d}>");
if (searchType != null) {
builder.field("search_type", searchType.name());
}
builder.startObject("body")
.startObject("query")
.startObject("match_all")
.endObject()
.endObject()
.endObject();
builder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
DynamicIndexName.Parser indexNamesParser = new DynamicIndexName.Parser();
String dateFormat;
Settings settings;
if (randomBoolean()) {
dateFormat = DynamicIndexName.DEFAULT_DATE_FORMAT;
settings = Settings.EMPTY;
} else {
dateFormat = "YYYY-MM";
settings = Settings.builder()
.put("watcher.transform.search.dynamic_indices.default_date_format", dateFormat)
.build();
}
SearchTransformFactory transformFactory = new SearchTransformFactory(settings, ClientProxy.of(client()));
ExecutableSearchTransform executable = transformFactory.parseExecutable("_id", parser);
DynamicIndexName[] indexNames = executable.indexNames();
assertThat(indexNames, notNullValue());
DateTime now = DateTime.now(UTC);
String[] names = DynamicIndexName.names(indexNames, now);
assertThat(names, notNullValue());
assertThat(names.length, is(2));
assertThat(names, arrayContaining("idx", "idx-" + DateTimeFormat.forPattern(dateFormat).print(now.minusDays(3))));
}
@Test(expected = SearchTransformException.class)
public void testParser_ScanNotSupported() throws Exception {
SearchRequest request = client().prepareSearch()
@ -300,6 +352,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
DynamicIndexName.Parser indexNamesParser = new DynamicIndexName.Parser();
SearchTransformFactory factory = new SearchTransformFactory(Settings.EMPTY, ClientProxy.of(client()));
factory.parseTransform("_id", parser);
@ -319,7 +372,10 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
final String expectedQuery = "{\"template\":{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{\"id\":\"" + ctx.id().value() + "\",\"metadata\":null,\"vars\":{},\"watch_id\":\"test-watch\",\"payload\":{},\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"},\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}";
"\"include_lower\":true,\"include_upper\":true}}}}}},\"params\":{\"seconds_param\":\"30s\",\"ctx\":{" +
"\"id\":\"" + ctx.id().value() + "\",\"metadata\":null,\"vars\":{},\"watch_id\":\"test-watch\",\"payload\":{}," +
"\"trigger\":{\"triggered_time\":\"1970-01-01T00:01:00.000Z\",\"scheduled_time\":\"1970-01-01T00:01:00.000Z\"}," +
"\"execution_time\":\"1970-01-01T00:01:00.000Z\"}}}";
Map<String, Object> params = new HashMap<>();
params.put("seconds_param", "30s");
@ -348,7 +404,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +
"\"include_lower\":true,\"include_upper\":true}}}}}}";
PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache","test-script", templateQuery).request();
PutIndexedScriptRequest indexedScriptRequest = client().preparePutIndexedScript("mustache", "test-script", templateQuery).request();
assertThat(client().putIndexedScript(indexedScriptRequest).actionGet().isCreated(), is(true));
Map<String, Object> params = new HashMap<>();
@ -439,7 +495,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
ensureGreen("test-search-index");
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
return executableSearchTransform.execute(ctx, Payload.Simple.EMPTY);
}

View File

@ -6,10 +6,9 @@
package org.elasticsearch.watcher.watch;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableSet;
import org.elasticsearch.common.bytes.BytesReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
@ -59,6 +58,7 @@ import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInputFactory;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.Clock;
@ -94,22 +94,24 @@ import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerService;
import org.elasticsearch.watcher.trigger.schedule.*;
import org.elasticsearch.watcher.trigger.schedule.support.*;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import java.util.Map;
import static org.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.watcher.test.WatcherTestUtils.matchAllRequest;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.hamcrest.Matchers.*;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Mockito.mock;
public class WatchTests extends ElasticsearchTestCase {
private ScriptServiceProxy scriptService;
private DynamicIndexName.Parser indexNamesParser;
private ClientProxy client;
private HttpClient httpClient;
private EmailService emailService;
@ -119,11 +121,13 @@ public class WatchTests extends ElasticsearchTestCase {
private SecretService secretService;
private LicenseService licenseService;
private ESLogger logger;
private DynamicIndexName.Parser indexNameParser;
private Settings settings = Settings.EMPTY;
@Before
public void init() throws Exception {
scriptService = mock(ScriptServiceProxy.class);
indexNamesParser = new DynamicIndexName.Parser();
client = mock(ClientProxy.class);
httpClient = mock(HttpClient.class);
emailService = mock(EmailService.class);
@ -133,9 +137,10 @@ public class WatchTests extends ElasticsearchTestCase {
licenseService = mock(LicenseService.class);
authRegistry = new HttpAuthRegistry(ImmutableMap.of("basic", (HttpAuthFactory) new BasicAuthFactory(secretService)));
logger = Loggers.getLogger(WatchTests.class);
indexNameParser = new DynamicIndexName.Parser();
}
@Test //@Repeat(iterations = 20)
@Test
public void testParser_SelfGenerated() throws Exception {
DateTime now = new DateTime(UTC);
ClockMock clock = new ClockMock();
@ -307,7 +312,7 @@ public class WatchTests extends ElasticsearchTestCase {
switch (type) {
case SearchInput.TYPE:
SearchInput searchInput = searchInput(WatcherTestUtils.newInputSearchRequest("idx")).build();
return new ExecutableSearchInput(searchInput, logger, client);
return new ExecutableSearchInput(searchInput, logger, client, indexNameParser);
default:
SimpleInput simpleInput = InputBuilders.simpleInput(ImmutableMap.<String, Object>builder().put("_key", "_val")).build();
return new ExecutableSimpleInput(simpleInput, logger);
@ -359,13 +364,13 @@ public class WatchTests extends ElasticsearchTestCase {
case ScriptTransform.TYPE:
return new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService);
case SearchTransform.TYPE:
return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client);
return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client, indexNameParser);
default: // chain
ChainTransform chainTransform = new ChainTransform(ImmutableList.of(
new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)),
new ScriptTransform(Script.inline("_script").build())));
return new ExecutableChainTransform(chainTransform, logger, ImmutableList.<ExecutableTransform>of(
new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client),
new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client, indexNameParser),
new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService)));
}
}
@ -389,8 +394,8 @@ public class WatchTests extends ElasticsearchTestCase {
list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), randomThrottler(), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer)));
}
if (randomBoolean()) {
IndexAction aciton = new IndexAction("_index", "_type", null);
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(aciton, logger, client)));
IndexAction action = new IndexAction("_index", "_type", null);
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(action, logger, client, indexNameParser)));
}
if (randomBoolean()) {
HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000))
@ -411,7 +416,7 @@ public class WatchTests extends ElasticsearchTestCase {
parsers.put(EmailAction.TYPE, new EmailActionFactory(settings, emailService, templateEngine, htmlSanitizer));
break;
case IndexAction.TYPE:
parsers.put(IndexAction.TYPE, new IndexActionFactory(settings, client));
parsers.put(IndexAction.TYPE, new IndexActionFactory(settings, client, indexNamesParser));
break;
case WebhookAction.TYPE:
parsers.put(WebhookAction.TYPE, new WebhookActionFactory(settings, httpClient,