Added time zone support to DynamicIndexName
Today `DynamicIndexName` generates the names based on UTC. This doesn't work for environments where the indices rollover is based on a local/different timezone. The timezone can be configured: * globally with the `watcher.dynamic_indices.time_zone` setting * collectively to all `search` input, `search` transform and `index` action with `watcher.input.search.dynamic_indices.time_zone`, `watcher.transform.search.dynamic_indices.time_zone` and `watcher.actions.index.dynamic_indices.time_zone` respectively * locally on the watch as part of the `search` input, `search` transform and/or `index` action definition. This commit revealed a set of bugs in the `search` input, `search` transform and/or `index` action when it comes to xcontent serialization. In short, when the user didn't define a timeout, a default timeout was set on them, which meant their serialized xcontent did not match the original one (from which they're deserialized). This bug also meant that the global default client timeouts (defined in the `ClientProxy` never took effect. This issue revealed the bugs above as the `WatchTests` were enhanced to execute more settings permutations Closes elastic/elasticsearch#632 Original commit: elastic/x-pack-elasticsearch@35a2191828
This commit is contained in:
parent
0a07d6dee5
commit
7cec0c97df
|
@ -10,7 +10,9 @@ import org.elasticsearch.action.bulk.BulkRequest;
|
|||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
|
||||
|
@ -33,12 +35,14 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|||
public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
|
||||
|
||||
private final ClientProxy client;
|
||||
private final TimeValue timeout;
|
||||
private final DynamicIndexName indexName;
|
||||
|
||||
public ExecutableIndexAction(IndexAction action, ESLogger logger, ClientProxy client, DynamicIndexName.Parser indexNameParser) {
|
||||
public ExecutableIndexAction(IndexAction action, ESLogger logger, ClientProxy client, @Nullable TimeValue defaultTimeout, DynamicIndexName.Parser indexNameParser) {
|
||||
super(action, logger);
|
||||
this.client = client;
|
||||
this.indexName = indexNameParser.parse(action.index);
|
||||
this.timeout = action.timeout != null ? action.timeout : defaultTimeout;
|
||||
this.indexName = indexNameParser.parse(action.index, action.dynamicNameTimeZone);
|
||||
}
|
||||
|
||||
DynamicIndexName indexName() {
|
||||
|
@ -83,7 +87,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
|
|||
return new IndexAction.Result.Simulated(indexRequest.index(), action.docType, new XContentSource(indexRequest.source(), XContentType.JSON));
|
||||
}
|
||||
|
||||
IndexResponse response = client.index(indexRequest, action.timeout);
|
||||
IndexResponse response = client.index(indexRequest, timeout);
|
||||
XContentBuilder jsonBuilder = jsonBuilder();
|
||||
indexResponseToXContent(jsonBuilder, response);
|
||||
return new IndexAction.Result.Success(new XContentSource(jsonBuilder));
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.watcher.actions.Action;
|
|||
import org.elasticsearch.watcher.support.DynamicIndexName;
|
||||
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
|
||||
import org.elasticsearch.watcher.support.xcontent.XContentSource;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -28,12 +29,15 @@ public class IndexAction implements Action {
|
|||
final String docType;
|
||||
final @Nullable String executionTimeField;
|
||||
final @Nullable TimeValue timeout;
|
||||
final @Nullable DateTimeZone dynamicNameTimeZone;
|
||||
|
||||
public IndexAction(String index, String docType, @Nullable String executionTimeField, @Nullable TimeValue timeout) {
|
||||
public IndexAction(String index, String docType, @Nullable String executionTimeField,
|
||||
@Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) {
|
||||
this.index = index;
|
||||
this.docType = docType;
|
||||
this.executionTimeField = executionTimeField;
|
||||
this.timeout = timeout;
|
||||
this.dynamicNameTimeZone = dynamicNameTimeZone;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -53,6 +57,10 @@ public class IndexAction implements Action {
|
|||
return executionTimeField;
|
||||
}
|
||||
|
||||
public DateTimeZone getDynamicNameTimeZone() {
|
||||
return dynamicNameTimeZone;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
@ -62,7 +70,10 @@ public class IndexAction implements Action {
|
|||
|
||||
if (!index.equals(that.index)) return false;
|
||||
if (!docType.equals(that.docType)) return false;
|
||||
return !(executionTimeField != null ? !executionTimeField.equals(that.executionTimeField) : that.executionTimeField != null);
|
||||
if (executionTimeField != null ? !executionTimeField.equals(that.executionTimeField) : that.executionTimeField != null)
|
||||
return false;
|
||||
if (timeout != null ? !timeout.equals(that.timeout) : that.timeout != null) return false;
|
||||
return !(dynamicNameTimeZone != null ? !dynamicNameTimeZone.equals(that.dynamicNameTimeZone) : that.dynamicNameTimeZone != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -70,6 +81,8 @@ public class IndexAction implements Action {
|
|||
int result = index.hashCode();
|
||||
result = 31 * result + docType.hashCode();
|
||||
result = 31 * result + (executionTimeField != null ? executionTimeField.hashCode() : 0);
|
||||
result = 31 * result + (timeout != null ? timeout.hashCode() : 0);
|
||||
result = 31 * result + (dynamicNameTimeZone != null ? dynamicNameTimeZone.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -83,15 +96,19 @@ public class IndexAction implements Action {
|
|||
}
|
||||
if (timeout != null) {
|
||||
builder.field(Field.TIMEOUT.getPreferredName(), timeout);
|
||||
};
|
||||
}
|
||||
if (dynamicNameTimeZone != null) {
|
||||
builder.field(Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), dynamicNameTimeZone);
|
||||
}
|
||||
return builder.endObject();
|
||||
}
|
||||
|
||||
public static IndexAction parse(String watchId, String actionId, XContentParser parser, TimeValue defaultTimeout) throws IOException {
|
||||
public static IndexAction parse(String watchId, String actionId, XContentParser parser) throws IOException {
|
||||
String index = null;
|
||||
String docType = null;
|
||||
String executionTimeField = null;
|
||||
TimeValue timeout = defaultTimeout;
|
||||
TimeValue timeout = null;
|
||||
DateTimeZone dynamicNameTimeZone = null;
|
||||
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token;
|
||||
|
@ -111,6 +128,12 @@ public class IndexAction implements Action {
|
|||
executionTimeField = parser.text();
|
||||
} else if (Field.TIMEOUT.match(currentFieldName)) {
|
||||
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString());
|
||||
} else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
|
||||
} else {
|
||||
throw new IndexActionException("could not parse [{}] action for watch [{}]. failed to parse [{}]. must be a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName);
|
||||
}
|
||||
} else {
|
||||
throw new IndexActionException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE, watchId, actionId, currentFieldName);
|
||||
}
|
||||
|
@ -127,7 +150,7 @@ public class IndexAction implements Action {
|
|||
throw new IndexActionException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId, actionId, Field.DOC_TYPE.getPreferredName());
|
||||
}
|
||||
|
||||
return new IndexAction(index, docType, executionTimeField, timeout);
|
||||
return new IndexAction(index, docType, executionTimeField, timeout, dynamicNameTimeZone);
|
||||
}
|
||||
|
||||
public static Builder builder(String index, String docType) {
|
||||
|
@ -201,6 +224,7 @@ public class IndexAction implements Action {
|
|||
final String docType;
|
||||
String executionTimeField;
|
||||
TimeValue timeout;
|
||||
DateTimeZone dynamicNameTimeZone;
|
||||
|
||||
private Builder(String index, String docType) {
|
||||
this.index = index;
|
||||
|
@ -217,9 +241,14 @@ public class IndexAction implements Action {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder dynamicNameTimeZone(DateTimeZone dynamicNameTimeZone) {
|
||||
this.dynamicNameTimeZone = dynamicNameTimeZone;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexAction build() {
|
||||
return new IndexAction(index, docType, executionTimeField, timeout);
|
||||
return new IndexAction(index, docType, executionTimeField, timeout, dynamicNameTimeZone);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -231,5 +260,6 @@ public class IndexAction implements Action {
|
|||
ParseField RESPONSE = new ParseField("response");
|
||||
ParseField REQUEST = new ParseField("request");
|
||||
ParseField TIMEOUT = new ParseField("timeout");
|
||||
ParseField DYNAMIC_NAME_TIMEZONE = new ParseField("dynamic_name_timezone");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,9 +30,8 @@ public class IndexActionFactory extends ActionFactory<IndexAction, ExecutableInd
|
|||
public IndexActionFactory(Settings settings, ClientProxy client) {
|
||||
super(Loggers.getLogger(ExecutableEmailAction.class, settings));
|
||||
this.client = client;
|
||||
String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.actions.index");
|
||||
this.indexNamesParser = new DynamicIndexName.Parser(defaultDateFormat);
|
||||
this.defaultTimeout = settings.getAsTime("watcher.action.index.default_timeout", TimeValue.timeValueSeconds(60));
|
||||
this.indexNamesParser = new DynamicIndexName.Parser(settings, "watcher.actions.index");
|
||||
this.defaultTimeout = settings.getAsTime("watcher.actions.index.default_timeout", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -42,11 +41,11 @@ public class IndexActionFactory extends ActionFactory<IndexAction, ExecutableInd
|
|||
|
||||
@Override
|
||||
public IndexAction parseAction(String watchId, String actionId, XContentParser parser) throws IOException {
|
||||
return IndexAction.parse(watchId, actionId, parser, defaultTimeout);
|
||||
return IndexAction.parse(watchId, actionId, parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutableIndexAction createExecutable(IndexAction action) {
|
||||
return new ExecutableIndexAction(action, actionLogger, client, indexNamesParser);
|
||||
return new ExecutableIndexAction(action, actionLogger, client, defaultTimeout, indexNamesParser);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.watcher.actions.ActionFactory;
|
||||
import org.elasticsearch.watcher.support.http.HttpClient;
|
||||
import org.elasticsearch.watcher.support.http.HttpRequest;
|
||||
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
|
||||
import org.elasticsearch.watcher.support.template.TemplateEngine;
|
||||
|
||||
|
@ -23,17 +22,15 @@ import java.io.IOException;
|
|||
public class WebhookActionFactory extends ActionFactory<WebhookAction, ExecutableWebhookAction> {
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final HttpRequest.Parser requestParser;
|
||||
private final HttpRequestTemplate.Parser requestTemplateParser;
|
||||
private final TemplateEngine templateEngine;
|
||||
|
||||
@Inject
|
||||
public WebhookActionFactory(Settings settings, HttpClient httpClient, HttpRequest.Parser requestParser,
|
||||
HttpRequestTemplate.Parser requestTemplateParser, TemplateEngine templateEngine) {
|
||||
public WebhookActionFactory(Settings settings, HttpClient httpClient, HttpRequestTemplate.Parser requestTemplateParser,
|
||||
TemplateEngine templateEngine) {
|
||||
|
||||
super(Loggers.getLogger(ExecutableWebhookAction.class, settings));
|
||||
this.httpClient = httpClient;
|
||||
this.requestParser = requestParser;
|
||||
this.requestTemplateParser = requestTemplateParser;
|
||||
this.templateEngine = templateEngine;
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.action.search.SearchType;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -35,13 +36,15 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
|
|||
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
|
||||
|
||||
private final ClientProxy client;
|
||||
private final @Nullable TimeValue timeout;
|
||||
private final @Nullable DynamicIndexName[] indexNames;
|
||||
|
||||
public ExecutableSearchInput(SearchInput input, ESLogger logger, ClientProxy client, DynamicIndexName.Parser indexNameParser) {
|
||||
public ExecutableSearchInput(SearchInput input, ESLogger logger, ClientProxy client, @Nullable TimeValue defaultTimeout, DynamicIndexName.Parser indexNameParser) {
|
||||
super(input, logger);
|
||||
this.client = client;
|
||||
this.timeout = input.getTimeout() != null ? input.getTimeout() : defaultTimeout;
|
||||
String[] indices = input.getSearchRequest().indices();
|
||||
indexNames = indices != null ? indexNameParser.parse(indices) : null;
|
||||
indexNames = indices != null ? indexNameParser.parse(indices, input.getDynamicNameTimeZone()) : null;
|
||||
}
|
||||
|
||||
DynamicIndexName[] indexNames() {
|
||||
|
@ -65,7 +68,7 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
|
|||
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), XContentHelper.convertToJson(source, false, true));
|
||||
}
|
||||
|
||||
SearchResponse response = client.search(request, input.getTimeout());
|
||||
SearchResponse response = client.search(request, timeout);
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}] found [{}] hits", ctx.id(), response.getHits().getTotalHits());
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.watcher.support.SearchRequestParseException;
|
|||
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
|
||||
import org.elasticsearch.watcher.support.WatcherUtils;
|
||||
import org.elasticsearch.watcher.watch.Payload;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
@ -34,11 +35,14 @@ public class SearchInput implements Input {
|
|||
private final SearchRequest searchRequest;
|
||||
private final @Nullable Set<String> extractKeys;
|
||||
private final @Nullable TimeValue timeout;
|
||||
private final @Nullable DateTimeZone dynamicNameTimeZone;
|
||||
|
||||
public SearchInput(SearchRequest searchRequest, @Nullable Set<String> extractKeys, @Nullable TimeValue timeout) {
|
||||
public SearchInput(SearchRequest searchRequest, @Nullable Set<String> extractKeys,
|
||||
@Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) {
|
||||
this.searchRequest = searchRequest;
|
||||
this.extractKeys = extractKeys;
|
||||
this.timeout = timeout;
|
||||
this.dynamicNameTimeZone = dynamicNameTimeZone;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -54,13 +58,17 @@ public class SearchInput implements Input {
|
|||
SearchInput that = (SearchInput) o;
|
||||
|
||||
if (!SearchRequestEquivalence.INSTANCE.equivalent(searchRequest, this.searchRequest)) return false;
|
||||
return !(extractKeys != null ? !extractKeys.equals(that.extractKeys) : that.extractKeys != null);
|
||||
if (extractKeys != null ? !extractKeys.equals(that.extractKeys) : that.extractKeys != null) return false;
|
||||
if (timeout != null ? !timeout.equals(that.timeout) : that.timeout != null) return false;
|
||||
return !(dynamicNameTimeZone != null ? !dynamicNameTimeZone.equals(that.dynamicNameTimeZone) : that.dynamicNameTimeZone != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = searchRequest.hashCode();
|
||||
result = 31 * result + (extractKeys != null ? extractKeys.hashCode() : 0);
|
||||
result = 31 * result + (timeout != null ? timeout.hashCode() : 0);
|
||||
result = 31 * result + (dynamicNameTimeZone != null ? dynamicNameTimeZone.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -76,6 +84,10 @@ public class SearchInput implements Input {
|
|||
return timeout;
|
||||
}
|
||||
|
||||
public DateTimeZone getDynamicNameTimeZone() {
|
||||
return dynamicNameTimeZone;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
@ -87,14 +99,18 @@ public class SearchInput implements Input {
|
|||
if (timeout != null) {
|
||||
builder.field(Field.TIMEOUT.getPreferredName(), timeout);
|
||||
}
|
||||
if (dynamicNameTimeZone != null) {
|
||||
builder.field(Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), dynamicNameTimeZone);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static SearchInput parse(String watchId, XContentParser parser, TimeValue defaultTimeout) throws IOException {
|
||||
public static SearchInput parse(String watchId, XContentParser parser) throws IOException {
|
||||
SearchRequest request = null;
|
||||
Set<String> extract = null;
|
||||
TimeValue timeout = defaultTimeout;
|
||||
TimeValue timeout = null;
|
||||
DateTimeZone dynamicNameTimeZone = null;
|
||||
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token;
|
||||
|
@ -122,6 +138,12 @@ public class SearchInput implements Input {
|
|||
}
|
||||
} else if (Field.TIMEOUT.match(currentFieldName)) {
|
||||
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString());
|
||||
} else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
|
||||
} else {
|
||||
throw new SearchInputException("could not parse [{}] input for watch [{}]. failed to parse [{}]. must be a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName);
|
||||
}
|
||||
} else {
|
||||
throw new SearchInputException("could not parse [{}] input for watch [{}]. unexpected token [{}]", TYPE, watchId, token);
|
||||
}
|
||||
|
@ -130,7 +152,7 @@ public class SearchInput implements Input {
|
|||
if (request == null) {
|
||||
throw new SearchInputException("could not parse [{}] input for watch [{}]. missing required [{}] field", TYPE, watchId, Field.REQUEST.getPreferredName());
|
||||
}
|
||||
return new SearchInput(request, extract, timeout);
|
||||
return new SearchInput(request, extract, timeout, dynamicNameTimeZone);
|
||||
}
|
||||
|
||||
public static Builder builder(SearchRequest request) {
|
||||
|
@ -172,6 +194,7 @@ public class SearchInput implements Input {
|
|||
private final SearchRequest request;
|
||||
private final ImmutableSet.Builder<String> extractKeys = ImmutableSet.builder();
|
||||
private TimeValue timeout;
|
||||
private DateTimeZone dynamicNameTimeZone;
|
||||
|
||||
private Builder(SearchRequest request) {
|
||||
this.request = request;
|
||||
|
@ -192,10 +215,15 @@ public class SearchInput implements Input {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder dynamicNameTimeZone(DateTimeZone dynamicNameTimeZone) {
|
||||
this.dynamicNameTimeZone = dynamicNameTimeZone;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchInput build() {
|
||||
Set<String> keys = extractKeys.build();
|
||||
return new SearchInput(request, keys.isEmpty() ? null : keys, timeout);
|
||||
return new SearchInput(request, keys.isEmpty() ? null : keys, timeout, dynamicNameTimeZone);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -203,5 +231,6 @@ public class SearchInput implements Input {
|
|||
ParseField REQUEST = new ParseField("request");
|
||||
ParseField EXTRACT = new ParseField("extract");
|
||||
ParseField TIMEOUT = new ParseField("timeout");
|
||||
ParseField DYNAMIC_NAME_TIMEZONE = new ParseField("dynamic_name_timezone");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,9 +30,8 @@ public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Re
|
|||
public SearchInputFactory(Settings settings, ClientProxy client) {
|
||||
super(Loggers.getLogger(ExecutableSimpleInput.class, settings));
|
||||
this.client = client;
|
||||
String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.input.search");
|
||||
this.indexNameParser = new DynamicIndexName.Parser(defaultDateFormat);
|
||||
this.defaultTimeout = settings.getAsTime("watcher.input.search.default_timeout", TimeValue.timeValueSeconds(30));
|
||||
this.indexNameParser = new DynamicIndexName.Parser(settings, "watcher.input.search");
|
||||
this.defaultTimeout = settings.getAsTime("watcher.input.search.default_timeout", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -42,11 +41,11 @@ public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Re
|
|||
|
||||
@Override
|
||||
public SearchInput parseInput(String watchId, XContentParser parser) throws IOException {
|
||||
return SearchInput.parse(watchId, parser, defaultTimeout);
|
||||
return SearchInput.parse(watchId, parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutableSearchInput createExecutable(SearchInput input) {
|
||||
return new ExecutableSearchInput(input, inputLogger, client, indexNameParser);
|
||||
return new ExecutableSearchInput(input, inputLogger, client, defaultTimeout, indexNameParser);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,19 +5,22 @@
|
|||
*/
|
||||
package org.elasticsearch.watcher.support;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.joda.DateMathParser;
|
||||
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
|
||||
import org.elasticsearch.common.joda.Joda;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.watcher.WatcherException;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
|
@ -88,6 +91,20 @@ public class DynamicIndexName implements ToXContent {
|
|||
return settings.get("watcher.dynamic_indices.default_date_format", DEFAULT_DATE_FORMAT);
|
||||
}
|
||||
|
||||
public static DateTimeZone timeZone(Settings settings, String componentPrefix) {
|
||||
if (componentPrefix == null) {
|
||||
return timeZone(settings);
|
||||
}
|
||||
|
||||
String timeZoneId = settings.get(componentPrefix + ".dynamic_indices.time_zone", DateTimeZone.UTC.getID());
|
||||
return DateTimeZone.forID(timeZoneId);
|
||||
}
|
||||
|
||||
public static DateTimeZone timeZone(Settings settings) {
|
||||
String timeZoneId = settings.get("watcher.dynamic_indices.time_zone", DateTimeZone.UTC.getID());
|
||||
return DateTimeZone.forID(timeZoneId);
|
||||
}
|
||||
|
||||
interface Expression {
|
||||
|
||||
String eval(DateTime now);
|
||||
|
@ -108,6 +125,48 @@ public class DynamicIndexName implements ToXContent {
|
|||
}
|
||||
}
|
||||
|
||||
static class DateMathExpression implements Expression {
|
||||
|
||||
private final DateMathParser dateMathParser;
|
||||
private final String mathExpression;
|
||||
private final FormatDateTimeFormatter formatter;
|
||||
private final DateTimeZone timeZone;
|
||||
|
||||
public DateMathExpression(String defaultFormat, DateTimeZone timeZone, String expression) {
|
||||
this.timeZone = timeZone;
|
||||
int i = expression.indexOf(LEFT_BOUND);
|
||||
String format;
|
||||
if (i < 0) {
|
||||
mathExpression = expression;
|
||||
format = defaultFormat;
|
||||
} else {
|
||||
if (expression.lastIndexOf(RIGHT_BOUND) != expression.length() - 1) {
|
||||
throw new ParseException("invalid dynamic name expression [{}]. missing closing `}` for date math format", expression);
|
||||
}
|
||||
if (i == expression.length() - 2) {
|
||||
throw new ParseException("invalid dynamic name expression [{}]. missing date format", expression);
|
||||
}
|
||||
mathExpression = expression.substring(0, i);
|
||||
format = expression.substring(i + 1, expression.length() - 1);
|
||||
|
||||
}
|
||||
DateTimeFormatter parser = DateTimeFormat.forPattern(format).withZone(timeZone);
|
||||
formatter = new FormatDateTimeFormatter(defaultFormat, parser, Locale.ROOT);
|
||||
dateMathParser = new DateMathParser(formatter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String eval(final DateTime now) {
|
||||
long millis = dateMathParser.parse(mathExpression, new Callable<Long>() {
|
||||
@Override
|
||||
public Long call() throws Exception {
|
||||
return now.getMillis();
|
||||
}
|
||||
}, false, timeZone);
|
||||
return formatter.printer().print(millis);
|
||||
}
|
||||
}
|
||||
|
||||
static class CompoundExpression implements Expression {
|
||||
|
||||
private final Expression[] parts;
|
||||
|
@ -125,7 +184,7 @@ public class DynamicIndexName implements ToXContent {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
static Expression parse(String defaultDateFormat, char[] text, int from, int length) {
|
||||
static Expression parse(String defaultDateFormat, DateTimeZone timeZone, char[] text, int from, int length) {
|
||||
boolean dynamic = false;
|
||||
List<Expression> expressions = new ArrayList<>();
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -170,7 +229,7 @@ public class DynamicIndexName implements ToXContent {
|
|||
inDateFormat = false;
|
||||
sb.append(c);
|
||||
} else {
|
||||
expressions.add(new DateMathExpression(defaultDateFormat, sb.toString()));
|
||||
expressions.add(new DateMathExpression(defaultDateFormat, timeZone, sb.toString()));
|
||||
sb = new StringBuilder();
|
||||
inPlaceHolder = false;
|
||||
dynamic = true;
|
||||
|
@ -227,81 +286,54 @@ public class DynamicIndexName implements ToXContent {
|
|||
}
|
||||
}
|
||||
|
||||
static class DateMathExpression implements Expression {
|
||||
|
||||
private final DateMathParser dateMathParser;
|
||||
private final String mathExpression;
|
||||
private final FormatDateTimeFormatter formatter;
|
||||
|
||||
public DateMathExpression(String defaultFormat, String expression) {
|
||||
int i = expression.indexOf(LEFT_BOUND);
|
||||
if (i < 0) {
|
||||
mathExpression = expression;
|
||||
formatter = Joda.forPattern(defaultFormat);
|
||||
} else {
|
||||
if (expression.lastIndexOf(RIGHT_BOUND) != expression.length() - 1) {
|
||||
throw new ParseException("invalid dynamic name expression [{}]. missing closing `}` for date math format", expression);
|
||||
}
|
||||
if (i == expression.length() - 2) {
|
||||
throw new ParseException("invalid dynamic name expression [{}]. missing date format", expression);
|
||||
}
|
||||
mathExpression = expression.substring(0, i);
|
||||
formatter = Joda.forPattern(expression.substring(i + 1, expression.length() - 1));
|
||||
}
|
||||
dateMathParser = new DateMathParser(formatter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String eval(final DateTime now) {
|
||||
long millis = dateMathParser.parse(mathExpression, new Callable<Long>() {
|
||||
@Override
|
||||
public Long call() throws Exception {
|
||||
return now.getMillis();
|
||||
}
|
||||
});
|
||||
return formatter.printer().print(millis);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Parser {
|
||||
|
||||
private final String defaultDateFormat;
|
||||
private final DateTimeZone timeZone;
|
||||
|
||||
public Parser() {
|
||||
this(DEFAULT_DATE_FORMAT);
|
||||
this(DEFAULT_DATE_FORMAT, DateTimeZone.UTC);
|
||||
}
|
||||
|
||||
public Parser(String defaultDateFormat) {
|
||||
public Parser(String defaultDateFormat, DateTimeZone timeZone) {
|
||||
this.defaultDateFormat = defaultDateFormat;
|
||||
this.timeZone = timeZone;
|
||||
}
|
||||
|
||||
public Parser(Settings settings, String componentPrefix) {
|
||||
this(defaultDateFormat(settings, componentPrefix), timeZone(settings, componentPrefix));
|
||||
}
|
||||
|
||||
public DynamicIndexName parse(String template) {
|
||||
return parse(template, null);
|
||||
}
|
||||
|
||||
public DynamicIndexName parse(String template, @Nullable DateTimeZone timeZone) {
|
||||
DateTimeZone tz = timeZone != null ? timeZone : this.timeZone;
|
||||
if (template == null) {
|
||||
return null;
|
||||
}
|
||||
if (!template.startsWith(EXPRESSION_LEFT_BOUND) || !template.endsWith(EXPRESSION_RIGHT_BOUND)) {
|
||||
return new DynamicIndexName(template, new StaticExpression(template));
|
||||
}
|
||||
return new DynamicIndexName(template, CompoundExpression.parse(defaultDateFormat, template.toCharArray(), 1, template.length() - 1));
|
||||
return new DynamicIndexName(template, CompoundExpression.parse(defaultDateFormat, tz, template.toCharArray(), 1, template.length() - 1));
|
||||
}
|
||||
|
||||
public DynamicIndexName[] parse(String[] templates) {
|
||||
return parse(templates, null);
|
||||
}
|
||||
|
||||
public DynamicIndexName[] parse(String[] templates, @Nullable DateTimeZone timeZone) {
|
||||
if (templates.length == 0) {
|
||||
return null;
|
||||
}
|
||||
DynamicIndexName[] dynamicIndexNames = new DynamicIndexName[templates.length];
|
||||
for (int i = 0; i < dynamicIndexNames.length; i++) {
|
||||
dynamicIndexNames[i] = parse(templates[i]);
|
||||
dynamicIndexNames[i] = parse(templates[i], timeZone);
|
||||
}
|
||||
return dynamicIndexNames;
|
||||
}
|
||||
|
||||
public DynamicIndexName parse(XContentParser parser) throws IOException {
|
||||
if (parser.currentToken() != XContentParser.Token.VALUE_STRING) {
|
||||
throw new ParseException("could not parse index name. expected a string value but found [{}] instead", parser.currentToken());
|
||||
}
|
||||
return parse(parser.text());
|
||||
}
|
||||
}
|
||||
|
||||
public static class ParseException extends WatcherException {
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.action.search.SearchResponse;
|
|||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.watcher.execution.WatchExecutionContext;
|
||||
import org.elasticsearch.watcher.support.DynamicIndexName;
|
||||
import org.elasticsearch.watcher.support.WatcherUtils;
|
||||
|
@ -25,13 +26,15 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
|
|||
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
|
||||
|
||||
protected final ClientProxy client;
|
||||
protected final @Nullable TimeValue timeout;
|
||||
private final @Nullable DynamicIndexName[] indexNames;
|
||||
|
||||
public ExecutableSearchTransform(SearchTransform transform, ESLogger logger, ClientProxy client, DynamicIndexName.Parser indexNameParser) {
|
||||
public ExecutableSearchTransform(SearchTransform transform, ESLogger logger, ClientProxy client, @Nullable TimeValue defaultTimeout, DynamicIndexName.Parser indexNameParser) {
|
||||
super(transform, logger);
|
||||
this.client = client;
|
||||
this.timeout = transform.getTimeout() != null ? transform.getTimeout() : defaultTimeout;
|
||||
String[] indices = transform.getRequest().indices();
|
||||
this.indexNames = indices != null ? indexNameParser.parse(indices) : null;
|
||||
this.indexNames = indices != null ? indexNameParser.parse(indices, transform.getDynamicNameTimeZone()) : null;
|
||||
}
|
||||
|
||||
DynamicIndexName[] indexNames() {
|
||||
|
@ -43,7 +46,7 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
|
|||
SearchRequest request = null;
|
||||
try {
|
||||
request = WatcherUtils.createSearchRequestFromPrototype(transform.getRequest(), indexNames, ctx, payload);
|
||||
SearchResponse resp = client.search(request, transform.getTimeout());
|
||||
SearchResponse resp = client.search(request, timeout);
|
||||
return new SearchTransform.Result(request, new Payload.XContent(resp));
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to execute [{}] transform for [{}]", e, SearchTransform.TYPE, ctx.id());
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
|
|||
import org.elasticsearch.watcher.support.WatcherUtils;
|
||||
import org.elasticsearch.watcher.transform.Transform;
|
||||
import org.elasticsearch.watcher.watch.Payload;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -29,10 +30,12 @@ public class SearchTransform implements Transform {
|
|||
|
||||
private final SearchRequest request;
|
||||
private final @Nullable TimeValue timeout;
|
||||
private final @Nullable DateTimeZone dynamicNameTimeZone;
|
||||
|
||||
public SearchTransform(SearchRequest request, @Nullable TimeValue timeout) {
|
||||
public SearchTransform(SearchRequest request, @Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) {
|
||||
this.request = request;
|
||||
this.timeout = timeout;
|
||||
this.dynamicNameTimeZone = dynamicNameTimeZone;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -48,19 +51,28 @@ public class SearchTransform implements Transform {
|
|||
return timeout;
|
||||
}
|
||||
|
||||
public DateTimeZone getDynamicNameTimeZone() {
|
||||
return dynamicNameTimeZone;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
SearchTransform transform = (SearchTransform) o;
|
||||
SearchTransform that = (SearchTransform) o;
|
||||
|
||||
return SearchRequestEquivalence.INSTANCE.equivalent(request, transform.request);
|
||||
if (!SearchRequestEquivalence.INSTANCE.equivalent(request, this.request)) return false;
|
||||
if (timeout != null ? !timeout.equals(that.timeout) : that.timeout != null) return false;
|
||||
return !(dynamicNameTimeZone != null ? !dynamicNameTimeZone.equals(that.dynamicNameTimeZone) : that.dynamicNameTimeZone != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return request.hashCode();
|
||||
int result = request.hashCode();
|
||||
result = 31 * result + (timeout != null ? timeout.hashCode() : 0);
|
||||
result = 31 * result + (dynamicNameTimeZone != null ? dynamicNameTimeZone.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,13 +83,17 @@ public class SearchTransform implements Transform {
|
|||
if (timeout != null) {
|
||||
builder.field(Field.TIMEOUT.getPreferredName(), timeout);
|
||||
}
|
||||
if (dynamicNameTimeZone != null) {
|
||||
builder.field(Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), dynamicNameTimeZone);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static SearchTransform parse(String watchId, XContentParser parser, TimeValue defaultTimeout) throws IOException {
|
||||
public static SearchTransform parse(String watchId, XContentParser parser) throws IOException {
|
||||
SearchRequest request = null;
|
||||
TimeValue timeout = defaultTimeout;
|
||||
TimeValue timeout = null;
|
||||
DateTimeZone dynamicNameTimeZone = null;
|
||||
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token;
|
||||
|
@ -92,6 +108,12 @@ public class SearchTransform implements Transform {
|
|||
}
|
||||
} else if (Field.TIMEOUT.match(currentFieldName)) {
|
||||
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString());
|
||||
} else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
|
||||
} else {
|
||||
throw new SearchTransformException("could not parse [{}] transform for watch [{}]. failed to parse [{}]. must be a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName);
|
||||
}
|
||||
} else {
|
||||
throw new SearchTransformException("could not parse [{}] transform for watch [{}]. unexpected field [{}]", TYPE, watchId, currentFieldName);
|
||||
}
|
||||
|
@ -100,7 +122,7 @@ public class SearchTransform implements Transform {
|
|||
if (request == null) {
|
||||
throw new SearchTransformException("could not parse [{}] transform for watch [{}]. missing required [{}] field", TYPE, watchId, Field.REQUEST.getPreferredName());
|
||||
}
|
||||
return new SearchTransform(request, timeout);
|
||||
return new SearchTransform(request, timeout, dynamicNameTimeZone);
|
||||
}
|
||||
|
||||
public static Builder builder(SearchRequest request) {
|
||||
|
@ -141,6 +163,7 @@ public class SearchTransform implements Transform {
|
|||
|
||||
private final SearchRequest request;
|
||||
private TimeValue timeout;
|
||||
private DateTimeZone dynamicNameTimeZone;
|
||||
|
||||
public Builder(SearchRequest request) {
|
||||
this.request = request;
|
||||
|
@ -151,14 +174,20 @@ public class SearchTransform implements Transform {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder dynamicNameTimeZone(DateTimeZone dynamicNameTimeZone) {
|
||||
this.dynamicNameTimeZone = dynamicNameTimeZone;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchTransform build() {
|
||||
return new SearchTransform(request, timeout);
|
||||
return new SearchTransform(request, timeout, dynamicNameTimeZone);
|
||||
}
|
||||
}
|
||||
|
||||
public interface Field extends Transform.Field {
|
||||
ParseField REQUEST = new ParseField("request");
|
||||
ParseField TIMEOUT = new ParseField("timeout");
|
||||
ParseField DYNAMIC_NAME_TIMEZONE = new ParseField("dynamic_name_timezone");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,9 +29,8 @@ public class SearchTransformFactory extends TransformFactory<SearchTransform, Se
|
|||
public SearchTransformFactory(Settings settings, ClientProxy client) {
|
||||
super(Loggers.getLogger(ExecutableSearchTransform.class, settings));
|
||||
this.client = client;
|
||||
String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.transform.search");
|
||||
this.indexNameParser = new DynamicIndexName.Parser(defaultDateFormat);
|
||||
this.defaultTimeout = settings.getAsTime("watcher.transform.search.default_timeout", TimeValue.timeValueSeconds(30));
|
||||
this.indexNameParser = new DynamicIndexName.Parser(settings, "watcher.transform.search");
|
||||
this.defaultTimeout = settings.getAsTime("watcher.transform.search.default_timeout", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -41,11 +40,11 @@ public class SearchTransformFactory extends TransformFactory<SearchTransform, Se
|
|||
|
||||
@Override
|
||||
public SearchTransform parseTransform(String watchId, XContentParser parser) throws IOException {
|
||||
return SearchTransform.parse(watchId, parser, defaultTimeout);
|
||||
return SearchTransform.parse(watchId, parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutableSearchTransform createExecutable(SearchTransform transform) {
|
||||
return new ExecutableSearchTransform(transform, transformLogger, client, indexNameParser);
|
||||
return new ExecutableSearchTransform(transform, transformLogger, client, defaultTimeout, indexNameParser);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.watcher.support.xcontent.XContentSource;
|
|||
import org.elasticsearch.watcher.test.WatcherTestUtils;
|
||||
import org.elasticsearch.watcher.watch.Payload;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -73,8 +74,8 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
|
|||
.get().isAcknowledged(), is(true));
|
||||
}
|
||||
|
||||
IndexAction action = new IndexAction("test-index", "test-type", timestampField, null);
|
||||
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
|
||||
IndexAction action = new IndexAction("test-index", "test-type", timestampField, null, null);
|
||||
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()), null, new DynamicIndexName.Parser());
|
||||
DateTime executionTime = DateTime.now(UTC);
|
||||
Payload payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", ImmutableMap.of("foo", "bar"));
|
||||
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload);
|
||||
|
@ -136,8 +137,8 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
|
|||
ImmutableSet.of(ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar1"))
|
||||
);
|
||||
|
||||
IndexAction action = new IndexAction("test-index", "test-type", timestampField, null);
|
||||
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
|
||||
IndexAction action = new IndexAction("test-index", "test-type", timestampField, null, null);
|
||||
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()), null, new DynamicIndexName.Parser());
|
||||
DateTime executionTime = DateTime.now(UTC);
|
||||
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, new Payload.Simple("_doc", list));
|
||||
|
||||
|
@ -215,32 +216,48 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
|
|||
if (timestampField != null) {
|
||||
assertThat(executable.action().executionTimeField, equalTo(timestampField));
|
||||
}
|
||||
if (writeTimeout != null) {
|
||||
assertThat(executable.action().timeout, equalTo(writeTimeout));
|
||||
} else {
|
||||
// default:
|
||||
assertThat(executable.action().timeout, equalTo(TimeValue.timeValueSeconds(60)));
|
||||
}
|
||||
assertThat(executable.action().timeout, equalTo(writeTimeout));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParser_DynamicIndex() throws Exception {
|
||||
|
||||
DateTime now = DateTime.now(UTC);
|
||||
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.forOffsetHours(-2) : null;
|
||||
if (timeZone != null) {
|
||||
now = now.withHourOfDay(0).withMinuteOfHour(0);
|
||||
}
|
||||
|
||||
XContentBuilder builder = jsonBuilder();
|
||||
builder.startObject()
|
||||
.field(IndexAction.Field.INDEX.getPreferredName(), "<idx-{now/d}>")
|
||||
.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type")
|
||||
.endObject();
|
||||
.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type");
|
||||
|
||||
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client()));
|
||||
boolean timeZoneInWatch = randomBoolean();
|
||||
if (timeZone != null && timeZoneInWatch) {
|
||||
builder.field(IndexAction.Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), timeZone);
|
||||
}
|
||||
|
||||
builder.endObject();
|
||||
|
||||
Settings.Builder settings = Settings.builder();
|
||||
if (timeZone != null && !timeZoneInWatch) {
|
||||
settings.put("watcher.actions.index.dynamic_indices.time_zone", timeZone);
|
||||
}
|
||||
|
||||
IndexActionFactory actionParser = new IndexActionFactory(settings.build(), ClientProxy.of(client()));
|
||||
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
|
||||
parser.nextToken();
|
||||
|
||||
ExecutableIndexAction executable = actionParser.parseExecutable(randomAsciiOfLength(5), randomAsciiOfLength(3), parser);
|
||||
|
||||
DateTime now = DateTime.now(UTC);
|
||||
assertThat(executable, notNullValue());
|
||||
assertThat(executable.action().index, is("<idx-{now/d}>"));
|
||||
assertThat(executable.indexName().name(now), is("idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now)));
|
||||
String indexName = executable.indexName().name(now);
|
||||
if (timeZone != null) {
|
||||
now = now.withZone(timeZone);
|
||||
}
|
||||
assertThat(indexName, is("idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package org.elasticsearch.watcher.actions.webhook;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.joda.time.DateTime;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -34,6 +33,7 @@ import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
|
|||
import org.elasticsearch.watcher.watch.Payload;
|
||||
import org.elasticsearch.watcher.watch.Watch;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -43,13 +43,13 @@ import java.io.IOException;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.joda.time.DateTimeZone.UTC;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.joda.time.DateTimeZone.UTC;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
|
@ -217,8 +217,7 @@ public class WebhookActionTests extends ElasticsearchTestCase {
|
|||
}
|
||||
|
||||
private WebhookActionFactory webhookFactory(HttpClient client) {
|
||||
return new WebhookActionFactory(Settings.EMPTY, client, new HttpRequest.Parser(authRegistry),
|
||||
new HttpRequestTemplate.Parser(authRegistry), templateEngine);
|
||||
return new WebhookActionFactory(Settings.EMPTY, client, new HttpRequestTemplate.Parser(authRegistry), templateEngine);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.watcher.watch.Payload;
|
|||
import org.elasticsearch.watcher.watch.Watch;
|
||||
import org.elasticsearch.watcher.watch.WatchStatus;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -115,7 +116,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
|
|||
.request()
|
||||
.source(searchSourceBuilder);
|
||||
|
||||
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
|
||||
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger, ClientProxy.of(client()), null, new DynamicIndexName.Parser());
|
||||
WatchExecutionContext ctx = new TriggeredExecutionContext(
|
||||
new Watch("test-watch",
|
||||
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
|
||||
|
@ -222,7 +223,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
|
|||
.request()
|
||||
.source(searchSourceBuilder);
|
||||
|
||||
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
|
||||
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger, ClientProxy.of(client()), null, new DynamicIndexName.Parser());
|
||||
WatchExecutionContext ctx = new TriggeredExecutionContext(
|
||||
new Watch("test-watch",
|
||||
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
|
||||
|
@ -254,7 +255,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
|
|||
.query(filteredQuery(matchQuery("event_type", "a"), rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
|
||||
|
||||
TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(randomInt(10)) : null;
|
||||
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, timeout));
|
||||
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, timeout, null));
|
||||
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
|
||||
parser.nextToken();
|
||||
|
||||
|
@ -262,44 +263,58 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
SearchInput searchInput = factory.parseInput("_id", parser);
|
||||
assertEquals(SearchInput.TYPE, searchInput.type());
|
||||
assertThat(searchInput.getTimeout(), equalTo(timeout != null ? timeout : TimeValue.timeValueSeconds(30))); // 30s is the default
|
||||
assertThat(searchInput.getTimeout(), equalTo(timeout));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParser_IndexNames() throws Exception {
|
||||
SearchRequest request = client().prepareSearch()
|
||||
.setSearchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE)
|
||||
.setIndices("test", "<test-{now/M-1M}>")
|
||||
.setIndices("test", "<test-{now/d-1d}>")
|
||||
.request()
|
||||
.source(searchSource()
|
||||
.query(boolQuery().must(matchQuery("event_type", "a")).filter(rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
|
||||
|
||||
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, null));
|
||||
DateTime now = DateTime.now(UTC);
|
||||
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.forOffsetHours(-2) : null;
|
||||
if (timeZone != null) {
|
||||
now = now.withHourOfDay(0).withMinuteOfHour(0);
|
||||
}
|
||||
|
||||
boolean timeZoneInWatch = randomBoolean();
|
||||
SearchInput input = timeZone != null && timeZoneInWatch ?
|
||||
new SearchInput(request, null, null, timeZone) :
|
||||
new SearchInput(request, null, null, null);
|
||||
|
||||
XContentBuilder builder = jsonBuilder().value(input);
|
||||
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
|
||||
parser.nextToken();
|
||||
|
||||
String dateFormat;
|
||||
Settings settings;
|
||||
Settings.Builder settingsBuilder = Settings.builder();
|
||||
if (randomBoolean()) {
|
||||
dateFormat = DynamicIndexName.DEFAULT_DATE_FORMAT;
|
||||
settings = Settings.EMPTY;
|
||||
} else {
|
||||
dateFormat = "YYYY-MM";
|
||||
settings = Settings.builder()
|
||||
.put("watcher.input.search.dynamic_indices.default_date_format", dateFormat)
|
||||
.build();
|
||||
dateFormat = "YYYY-MM-dd";
|
||||
settingsBuilder.put("watcher.input.search.dynamic_indices.default_date_format", dateFormat);
|
||||
}
|
||||
if (timeZone != null && !timeZoneInWatch) {
|
||||
settingsBuilder.put("watcher.input.search.dynamic_indices.time_zone", timeZone);
|
||||
}
|
||||
|
||||
SearchInputFactory factory = new SearchInputFactory(settings, ClientProxy.of(client()));
|
||||
SearchInputFactory factory = new SearchInputFactory(settingsBuilder.build(), ClientProxy.of(client()));
|
||||
|
||||
ExecutableSearchInput executable = factory.parseExecutable("_id", parser);
|
||||
DynamicIndexName[] indexNames = executable.indexNames();
|
||||
assertThat(indexNames, notNullValue());
|
||||
DateTime now = DateTime.now(UTC);
|
||||
|
||||
String[] names = DynamicIndexName.names(indexNames, now);
|
||||
assertThat(names, notNullValue());
|
||||
assertThat(names.length, is(2));
|
||||
assertThat(names, arrayContaining("test", "test-" + DateTimeFormat.forPattern(dateFormat).print(now.withDayOfMonth(1).minusMonths(1))));
|
||||
if (timeZone != null) {
|
||||
now = now.withZone(timeZone);
|
||||
}
|
||||
assertThat(names, arrayContaining("test", "test-" + DateTimeFormat.forPattern(dateFormat).print(now.minusDays(1))));
|
||||
}
|
||||
|
||||
@Test(expected = SearchInputException.class)
|
||||
|
@ -310,7 +325,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
|
|||
.source(searchSource()
|
||||
.query(filteredQuery(matchQuery("event_type", "a"), rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
|
||||
|
||||
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, null));
|
||||
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, null, null));
|
||||
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
|
||||
parser.nextToken();
|
||||
|
||||
|
@ -343,7 +358,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
SearchInput si = siBuilder.build();
|
||||
|
||||
ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
|
||||
ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()), null, new DynamicIndexName.Parser());
|
||||
return searchInput.execute(ctx);
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
public void testNormal() throws Exception {
|
||||
String indexName = randomAsciiOfLength(10);
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
DynamicIndexName indexNames = parser.parse(indexName);
|
||||
String name = indexNames.name(now);
|
||||
assertThat(name, equalTo(indexName));
|
||||
|
@ -34,7 +34,7 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
@Test
|
||||
public void testExpression() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
DynamicIndexName indexNames = parser.parse("<.marvel-{now}>");
|
||||
String name = indexNames.name(now);
|
||||
assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now)));
|
||||
|
@ -42,7 +42,7 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
|
||||
@Test
|
||||
public void testNullOrEmpty() throws Exception {
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
DynamicIndexName indexName = parser.parse((String) null);
|
||||
assertThat(indexName, nullValue());
|
||||
DynamicIndexName[] indexNames = parser.parse(Strings.EMPTY_ARRAY);
|
||||
|
@ -52,7 +52,7 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
@Test
|
||||
public void testExpression_Static() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
DynamicIndexName indexNames = parser.parse("<.marvel-test>");
|
||||
String name = indexNames.name(now);
|
||||
assertThat(name, equalTo(".marvel-test"));
|
||||
|
@ -61,7 +61,7 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
@Test
|
||||
public void testExpression_MultiParts() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
DynamicIndexName indexNames = parser.parse("<.text1-{now/d}-text2-{now/M}>");
|
||||
String name = indexNames.name(now);
|
||||
assertThat(name, equalTo(".text1-"
|
||||
|
@ -73,16 +73,79 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
@Test
|
||||
public void testExpression_CustomFormat() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
DynamicIndexName indexNames = parser.parse("<.marvel-{now/d{YYYY.MM.dd}}>");
|
||||
String name = indexNames.name(now);
|
||||
assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpression_CustomTimeZone() throws Exception {
|
||||
DateTimeZone timeZone;
|
||||
int hoursOffset;
|
||||
int minutesOffset = 0;
|
||||
if (randomBoolean()) {
|
||||
hoursOffset = randomIntBetween(-12, 14);
|
||||
timeZone = DateTimeZone.forOffsetHours(hoursOffset);
|
||||
} else {
|
||||
hoursOffset = randomIntBetween(-11, 13);
|
||||
minutesOffset = randomIntBetween(0, 59);
|
||||
timeZone = DateTimeZone.forOffsetHoursMinutes(hoursOffset, minutesOffset);
|
||||
}
|
||||
DateTime now;
|
||||
if (hoursOffset >= 0) {
|
||||
// rounding to next day 00:00
|
||||
now = DateTime.now(DateTimeZone.UTC).plusHours(hoursOffset).plusMinutes(minutesOffset).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0);
|
||||
} else {
|
||||
// rounding to today 00:00
|
||||
now = DateTime.now(DateTimeZone.UTC).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0);
|
||||
}
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", timeZone);
|
||||
DynamicIndexName indexNames = parser.parse("<.marvel-{now/d{YYYY.MM.dd}}>");
|
||||
String name = indexNames.name(now);
|
||||
logger.info("timezone: [{}], now [{}], name: [{}]", timeZone, now, name);
|
||||
assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now.withZone(timeZone))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpression_CustomTimeZone_OnParse() throws Exception {
|
||||
DateTimeZone timeZone;
|
||||
int hoursOffset;
|
||||
int minutesOffset = 0;
|
||||
if (randomBoolean()) {
|
||||
hoursOffset = randomIntBetween(-12, 14);
|
||||
timeZone = DateTimeZone.forOffsetHours(hoursOffset);
|
||||
} else {
|
||||
hoursOffset = randomIntBetween(-11, 13);
|
||||
minutesOffset = randomIntBetween(0, 59);
|
||||
timeZone = DateTimeZone.forOffsetHoursMinutes(hoursOffset, minutesOffset);
|
||||
}
|
||||
DateTime now;
|
||||
if (hoursOffset >= 0) {
|
||||
// rounding to next day 00:00
|
||||
now = DateTime.now(DateTimeZone.UTC).plusHours(hoursOffset).plusMinutes(minutesOffset).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0);
|
||||
} else {
|
||||
// rounding to today 00:00
|
||||
now = DateTime.now(DateTimeZone.UTC).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0);
|
||||
}
|
||||
Settings settings = Settings.builder()
|
||||
.put("watcher.dynamic_indices.default_date_format", "YYYY.MM.dd")
|
||||
.put("watcher.dynamic_indices.time_zone", "-12")
|
||||
.put("watcher.foo.dynamic_indices.time_zone", "-12")
|
||||
.build();
|
||||
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser(settings, "watcher.foo");
|
||||
DynamicIndexName indexNames = parser.parse("<.marvel-{now/d{YYYY.MM.dd}}>", timeZone);
|
||||
String name = indexNames.name(now);
|
||||
logger.info("timezone: [{}], now [{}], name: [{}]", timeZone, now, name);
|
||||
assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now.withZone(timeZone))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpression_EscapeStatic() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
DynamicIndexName indexNames = parser.parse("<.mar\\{v\\}el-{now/d}>");
|
||||
String name = indexNames.name(now);
|
||||
assertThat(name, equalTo(".mar{v}el-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(now)));
|
||||
|
@ -91,7 +154,7 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
@Test
|
||||
public void testExpression_EscapeDateFormat() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
DynamicIndexName indexNames = parser.parse("<.marvel-{now/d{'\\{year\\}'YYYY}}>");
|
||||
String name = indexNames.name(now);
|
||||
assertThat(name, equalTo(".marvel-" + DateTimeFormat.forPattern("'{year}'YYYY").print(now)));
|
||||
|
@ -100,7 +163,7 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
@Test
|
||||
public void testExpression_MixedArray() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
DynamicIndexName[] indexNames = parser.parse(new String[] {
|
||||
"name1",
|
||||
"<.marvel-{now/d}>",
|
||||
|
@ -121,33 +184,35 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
|
||||
@Test(expected = DynamicIndexName.ParseException.class)
|
||||
public void testExpression_Invalid_Unescaped() throws Exception {
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
parser.parse("<.mar}vel-{now/d}>");
|
||||
}
|
||||
|
||||
@Test(expected = DynamicIndexName.ParseException.class)
|
||||
public void testExpression_Invalid_DateMathFormat() throws Exception {
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
parser.parse("<.marvel-{now/d{}>");
|
||||
}
|
||||
|
||||
@Test(expected = DynamicIndexName.ParseException.class)
|
||||
public void testExpression_Invalid_EmptyDateMathFormat() throws Exception {
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
parser.parse("<.marvel-{now/d{}}>");
|
||||
}
|
||||
|
||||
@Test(expected = DynamicIndexName.ParseException.class)
|
||||
public void testExpression_Invalid_OpenEnded() throws Exception {
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd");
|
||||
DynamicIndexName.Parser parser = new DynamicIndexName.Parser("YYYY.MM.dd", DateTimeZone.UTC);
|
||||
parser.parse("<.marvel-{now/d>");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultDateFormat_Default() throws Exception {
|
||||
String dateFormat = DynamicIndexName.defaultDateFormat(Settings.EMPTY);
|
||||
assertThat(dateFormat, is("YYYY.MM.dd"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultDateFormat() throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put("watcher.dynamic_indices.default_date_format", "YYYY.MM")
|
||||
|
@ -158,6 +223,7 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
assertThat(dateFormat, is("YYYY.MM"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultDateFormat_Component() throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put("watcher.dynamic_indices.default_date_format", "YYYY.MM")
|
||||
|
@ -166,4 +232,34 @@ public class DynamicIndexNameTests extends ElasticsearchTestCase {
|
|||
String dateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.foo");
|
||||
assertThat(dateFormat, is("YYY.MM"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeZone_Default() throws Exception {
|
||||
DateTimeZone timeZone = DynamicIndexName.timeZone(Settings.EMPTY);
|
||||
assertThat(timeZone, is(DateTimeZone.UTC));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeZone() throws Exception {
|
||||
DateTimeZone timeZone = DateTimeZone.forOffsetHours(randomIntBetween(-12, 14));
|
||||
Settings settings = Settings.builder()
|
||||
.put("watcher.dynamic_indices.time_zone", timeZone)
|
||||
.build();
|
||||
DateTimeZone resolvedTimeZone = randomBoolean() ?
|
||||
DynamicIndexName.timeZone(settings) :
|
||||
DynamicIndexName.timeZone(settings, null);
|
||||
assertThat(timeZone, is(resolvedTimeZone));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeZone_Component() throws Exception {
|
||||
DateTimeZone timeZone = DateTimeZone.forOffsetHours(randomIntBetween(-11, 14));
|
||||
Settings settings = Settings.builder()
|
||||
.put("watcher.dynamic_indices.time_zone", "-12")
|
||||
.put("watcher.foo.dynamic_indices.time_zone", timeZone)
|
||||
.build();
|
||||
DateTimeZone resolvedTimeZone = DynamicIndexName.timeZone(settings, "watcher.foo");
|
||||
assertThat(resolvedTimeZone, is(timeZone));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,20 +5,19 @@
|
|||
*/
|
||||
package org.elasticsearch.watcher.test;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.script.ScriptContextRegistry;
|
||||
import org.joda.time.DateTime;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.script.ScriptContextRegistry;
|
||||
import org.elasticsearch.script.ScriptEngineService;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.groovy.GroovyScriptEngineService;
|
||||
|
@ -66,6 +65,7 @@ import org.elasticsearch.watcher.watch.Payload;
|
|||
import org.elasticsearch.watcher.watch.Watch;
|
||||
import org.elasticsearch.watcher.watch.WatchStatus;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.mail.internet.AddressException;
|
||||
import java.io.IOException;
|
||||
|
@ -75,10 +75,10 @@ import java.util.*;
|
|||
|
||||
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomInt;
|
||||
import static org.apache.lucene.util.LuceneTestCase.createTempDir;
|
||||
import static org.joda.time.DateTimeZone.UTC;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
||||
import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom;
|
||||
import static org.joda.time.DateTimeZone.UTC;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -223,7 +223,7 @@ public final class WatcherTestUtils {
|
|||
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")),
|
||||
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger),
|
||||
new ExecutableScriptCondition(new ScriptCondition(Script.inline("return true").build()), logger, scriptService),
|
||||
new ExecutableSearchTransform(new SearchTransform(transformRequest, null), logger, client, new DynamicIndexName.Parser()),
|
||||
new ExecutableSearchTransform(new SearchTransform(transformRequest, null, null), logger, client, null, new DynamicIndexName.Parser()),
|
||||
new TimeValue(0),
|
||||
new ExecutableActions(actions),
|
||||
metadata,
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.watcher.transform.search;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Repeat;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
|
@ -43,6 +44,7 @@ import org.elasticsearch.watcher.watch.Payload;
|
|||
import org.elasticsearch.watcher.watch.Watch;
|
||||
import org.elasticsearch.watcher.watch.WatchStatus;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -130,7 +132,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
|
|||
.endObject()
|
||||
.endObject());
|
||||
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
|
||||
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
|
||||
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), null, new DynamicIndexName.Parser());
|
||||
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
|
||||
|
||||
|
@ -167,7 +169,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
|
|||
.endObject()
|
||||
.endObject());
|
||||
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
|
||||
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
|
||||
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), null, new DynamicIndexName.Parser());
|
||||
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
|
||||
|
||||
|
@ -213,7 +215,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
|
|||
.must(termQuery("value", "{{ctx.payload.value}}")))));
|
||||
|
||||
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
|
||||
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
|
||||
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), null, new DynamicIndexName.Parser());
|
||||
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00", UTC), parseDate("2015-01-01T00:00:00", UTC));
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00", UTC), event, EMPTY_PAYLOAD);
|
||||
|
@ -299,7 +301,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
|
|||
assertThat(executable.transform().getRequest().templateSource().toUtf8(), equalTo("{\"file\":\"template1\"}"));
|
||||
}
|
||||
assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes()));
|
||||
assertThat(executable.transform().getTimeout(), equalTo(readTimeout != null ? readTimeout : TimeValue.timeValueSeconds(30))); // 30s is the default
|
||||
assertThat(executable.transform().getTimeout(), equalTo(readTimeout));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -312,40 +314,55 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
|
|||
builder.field("search_type", searchType.name());
|
||||
}
|
||||
|
||||
DateTime now = DateTime.now(UTC);
|
||||
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.forOffsetHours(-2) : null;
|
||||
if (timeZone != null) {
|
||||
now = now.withHourOfDay(0).withMinuteOfHour(0);
|
||||
}
|
||||
|
||||
builder.startObject("body")
|
||||
.startObject("query")
|
||||
.startObject("match_all")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject();
|
||||
|
||||
builder.endObject();
|
||||
|
||||
boolean timeZoneInWatch = randomBoolean();
|
||||
if (timeZone != null && timeZoneInWatch) {
|
||||
builder.field(SearchTransform.Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), timeZone);
|
||||
}
|
||||
|
||||
builder.endObject();
|
||||
|
||||
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
|
||||
parser.nextToken();
|
||||
|
||||
DynamicIndexName.Parser indexNamesParser = new DynamicIndexName.Parser();
|
||||
String dateFormat;
|
||||
Settings settings;
|
||||
Settings.Builder settingsBuilder = Settings.builder();
|
||||
if (randomBoolean()) {
|
||||
dateFormat = DynamicIndexName.DEFAULT_DATE_FORMAT;
|
||||
settings = Settings.EMPTY;
|
||||
} else {
|
||||
dateFormat = "YYYY-MM";
|
||||
settings = Settings.builder()
|
||||
.put("watcher.transform.search.dynamic_indices.default_date_format", dateFormat)
|
||||
.build();
|
||||
settingsBuilder.put("watcher.transform.search.dynamic_indices.default_date_format", dateFormat);
|
||||
}
|
||||
if (timeZone != null && !timeZoneInWatch) {
|
||||
settingsBuilder.put("watcher.transform.search.dynamic_indices.time_zone", timeZone);
|
||||
}
|
||||
Settings settings = settingsBuilder.build();
|
||||
|
||||
SearchTransformFactory transformFactory = new SearchTransformFactory(settings, ClientProxy.of(client()));
|
||||
|
||||
ExecutableSearchTransform executable = transformFactory.parseExecutable("_id", parser);
|
||||
DynamicIndexName[] indexNames = executable.indexNames();
|
||||
assertThat(indexNames, notNullValue());
|
||||
DateTime now = DateTime.now(UTC);
|
||||
|
||||
String[] names = DynamicIndexName.names(indexNames, now);
|
||||
assertThat(names, notNullValue());
|
||||
assertThat(names.length, is(2));
|
||||
if (timeZone != null) {
|
||||
now = now.withZone(timeZone);
|
||||
}
|
||||
assertThat(names, arrayContaining("idx", "idx-" + DateTimeFormat.forPattern(dateFormat).print(now.minusDays(3))));
|
||||
}
|
||||
|
||||
|
@ -505,7 +522,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
|
|||
ensureGreen("test-search-index");
|
||||
|
||||
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
|
||||
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
|
||||
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()), null, new DynamicIndexName.Parser());
|
||||
|
||||
return executableSearchTransform.execute(ctx, Payload.Simple.EMPTY);
|
||||
}
|
||||
|
|
|
@ -66,7 +66,6 @@ import org.elasticsearch.watcher.support.clock.ClockMock;
|
|||
import org.elasticsearch.watcher.support.clock.SystemClock;
|
||||
import org.elasticsearch.watcher.support.http.HttpClient;
|
||||
import org.elasticsearch.watcher.support.http.HttpMethod;
|
||||
import org.elasticsearch.watcher.support.http.HttpRequest;
|
||||
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
|
||||
import org.elasticsearch.watcher.support.http.auth.HttpAuthFactory;
|
||||
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
|
||||
|
@ -95,6 +94,7 @@ import org.elasticsearch.watcher.trigger.TriggerService;
|
|||
import org.elasticsearch.watcher.trigger.schedule.*;
|
||||
import org.elasticsearch.watcher.trigger.schedule.support.*;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -310,7 +310,7 @@ public class WatchTests extends ElasticsearchTestCase {
|
|||
switch (type) {
|
||||
case SearchInput.TYPE:
|
||||
SearchInput searchInput = searchInput(WatcherTestUtils.newInputSearchRequest("idx")).build();
|
||||
return new ExecutableSearchInput(searchInput, logger, client, indexNameParser);
|
||||
return new ExecutableSearchInput(searchInput, logger, client, null, indexNameParser);
|
||||
default:
|
||||
SimpleInput simpleInput = InputBuilders.simpleInput(ImmutableMap.<String, Object>builder().put("_key", "_val")).build();
|
||||
return new ExecutableSimpleInput(simpleInput, logger);
|
||||
|
@ -359,17 +359,18 @@ public class WatchTests extends ElasticsearchTestCase {
|
|||
private ExecutableTransform randomTransform() {
|
||||
String type = randomFrom(ScriptTransform.TYPE, SearchTransform.TYPE, ChainTransform.TYPE);
|
||||
TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(5) : null;
|
||||
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.UTC : null;
|
||||
switch (type) {
|
||||
case ScriptTransform.TYPE:
|
||||
return new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService);
|
||||
case SearchTransform.TYPE:
|
||||
return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout), logger, client, indexNameParser);
|
||||
return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout, timeZone), logger, client, null, indexNameParser);
|
||||
default: // chain
|
||||
ChainTransform chainTransform = new ChainTransform(ImmutableList.of(
|
||||
new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout),
|
||||
new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout, timeZone),
|
||||
new ScriptTransform(Script.inline("_script").build())));
|
||||
return new ExecutableChainTransform(chainTransform, logger, ImmutableList.<ExecutableTransform>of(
|
||||
new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout), logger, client, indexNameParser),
|
||||
new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout, timeZone), logger, client, null, indexNameParser),
|
||||
new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService)));
|
||||
}
|
||||
}
|
||||
|
@ -393,8 +394,10 @@ public class WatchTests extends ElasticsearchTestCase {
|
|||
list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), randomThrottler(), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer)));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
IndexAction action = new IndexAction("_index", "_type", null, null);
|
||||
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(action, logger, client, indexNameParser)));
|
||||
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.UTC : null;
|
||||
TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(30) : null;
|
||||
IndexAction action = new IndexAction("_index", "_type", null, timeout, timeZone);
|
||||
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(action, logger, client, null, indexNameParser)));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000))
|
||||
|
@ -418,8 +421,7 @@ public class WatchTests extends ElasticsearchTestCase {
|
|||
parsers.put(IndexAction.TYPE, new IndexActionFactory(settings, client));
|
||||
break;
|
||||
case WebhookAction.TYPE:
|
||||
parsers.put(WebhookAction.TYPE, new WebhookActionFactory(settings, httpClient,
|
||||
new HttpRequest.Parser(authRegistry), new HttpRequestTemplate.Parser(authRegistry), templateEngine));
|
||||
parsers.put(WebhookAction.TYPE, new WebhookActionFactory(settings, httpClient, new HttpRequestTemplate.Parser(authRegistry), templateEngine));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue