Watcher: Remove support for _timestamp field in index action (elastic/elasticsearch#2575)

The watch index action was using the _timestamp field by default.
This functionality now needs to be configured explicitely for a special
field that is part of that document which is going to be indexed.

Relates elastic/elasticsearchelastic/elasticsearch#18980

Original commit: elastic/x-pack-elasticsearch@dfa4cf2296
This commit is contained in:
Alexander Reelsen 2016-06-22 14:31:27 +02:00 committed by GitHub
parent 694a91cd2f
commit 5883efc976
7 changed files with 56 additions and 186 deletions

View File

@ -66,13 +66,10 @@ public class GroovyScriptConditionIT extends AbstractWatcherIntegrationTestCase
}
public void testGroovyClosureWithAggregations() throws Exception {
client().admin().indices().prepareCreate(".monitoring")
.addMapping("cluster_stats", "_timestamp", "enabled=true")
.get();
for (int seconds = 0; seconds < 60; seconds += 5) {
client().prepareIndex(".monitoring", "cluster_stats").setTimestamp("2005-01-01T00:00:" +
String.format(Locale.ROOT, "%02d", seconds)).setSource("status", randomFrom("green", "yellow")).get();
String timestamp = "2005-01-01T00:00:" + String.format(Locale.ROOT, "%02d", seconds);
client().prepareIndex(".monitoring", "cluster_stats")
.setSource("status", randomFrom("green", "yellow"), "@timestamp", timestamp).get();
}
refresh();
@ -80,7 +77,7 @@ public class GroovyScriptConditionIT extends AbstractWatcherIntegrationTestCase
SearchRequestBuilder builder = client().prepareSearch(".monitoring")
.addAggregation(
AggregationBuilders
.dateHistogram("minutes").field("_timestamp").interval(TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS))
.dateHistogram("minutes").field("@timestamp").interval(TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS))
.order(Histogram.Order.COUNT_DESC)
.subAggregation(AggregationBuilders.terms("status").field("status.keyword").size(3)));
SearchResponse unmetResponse = builder.get();
@ -101,8 +98,8 @@ public class GroovyScriptConditionIT extends AbstractWatcherIntegrationTestCase
assertFalse(condition.execute(unmetContext).met());
for (int seconds = 0; seconds < 60; seconds += 5) {
client().prepareIndex(".monitoring", "cluster_stats").setTimestamp("2005-01-01T00:01:" +
String.format(Locale.ROOT, "%02d", seconds)).setSource("status", randomFrom("red")).get();
String timestamp = "2005-01-01T00:01:" + String.format(Locale.ROOT, "%02d", seconds);
client().prepareIndex(".monitoring", "cluster_stats").setSource("status", randomFrom("red"), "@timestamp", timestamp).get();
}
refresh();

View File

@ -20,11 +20,11 @@ import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition;
import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Script;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.junit.After;
@ -61,18 +61,14 @@ public class ScriptConditionSearchIT extends AbstractWatcherIntegrationTestCase
}
public void testExecuteWithAggs() throws Exception {
client().admin().indices().prepareCreate("my-index")
.addMapping("my-type", "_timestamp", "enabled=true")
.get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:00").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:10").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:20").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:30").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:00").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:10").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:20").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:30").get();
refresh();
SearchResponse response = client().prepareSearch("my-index")
.addAggregation(AggregationBuilders.dateHistogram("rate").field("_timestamp")
.addAggregation(AggregationBuilders.dateHistogram("rate").field("@timestamp")
.dateHistogramInterval(DateHistogramInterval.HOUR).order(Histogram.Order.COUNT_DESC))
.get();
@ -83,10 +79,10 @@ public class ScriptConditionSearchIT extends AbstractWatcherIntegrationTestCase
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
assertFalse(condition.execute(ctx).met());
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:40").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:40").get();
refresh();
response = client().prepareSearch("my-index").addAggregation(AggregationBuilders.dateHistogram("rate").field("_timestamp")
response = client().prepareSearch("my-index").addAggregation(AggregationBuilders.dateHistogram("rate").field("@timestamp")
.dateHistogramInterval(DateHistogramInterval.HOUR).order(Histogram.Order.COUNT_DESC))
.get();

View File

@ -15,7 +15,6 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.ExecutableAction;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
@ -24,6 +23,7 @@ import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.HashMap;
@ -63,19 +63,9 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) {
if (!(data instanceof HashMap)) {
data = new HashMap<>(data); // ensuring mutability
}
data.put(action.executionTimeField, WatcherDateTimeUtils.formatDate(ctx.executionTime()));
} else {
indexRequest.timestamp(WatcherDateTimeUtils.formatDate(ctx.executionTime()));
}
data = addTimestampToDocument(data, ctx.executionTime());
indexRequest.source(jsonBuilder().prettyPrint().map(data));
if (ctx.simulateAction(actionId)) {
@ -100,14 +90,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
if (action.executionTimeField != null && !TimestampFieldMapper.NAME.equals(action.executionTimeField)) {
if (!(doc instanceof HashMap)) {
doc = new HashMap<>(doc); // ensuring mutability
}
doc.put(action.executionTimeField, WatcherDateTimeUtils.formatDate(ctx.executionTime()));
} else {
indexRequest.timestamp(WatcherDateTimeUtils.formatDate(ctx.executionTime()));
}
doc = addTimestampToDocument(doc, ctx.executionTime());
indexRequest.source(jsonBuilder().prettyPrint().map(doc));
bulkRequest.add(indexRequest);
}
@ -121,6 +104,16 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
return new IndexAction.Result.Success(new XContentSource(jsonBuilder.bytes(), XContentType.JSON));
}
private Map<String, Object> addTimestampToDocument(Map<String, Object> data, DateTime executionTime) {
if (action.executionTimeField != null) {
if (!(data instanceof HashMap)) {
data = new HashMap<>(data); // ensuring mutability
}
data.put(action.executionTimeField, WatcherDateTimeUtils.formatDate(executionTime));
}
return data;
}
static void indexResponseToXContent(XContentBuilder builder, IndexResponse response) throws IOException {
builder.startObject()
.field("created", response.isCreated())

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -64,14 +65,8 @@ public class IndexActionTests extends ESIntegTestCase {
}
public void testIndexActionExecuteSingleDoc() throws Exception {
String timestampField = randomFrom(null, "_timestamp", "@timestamp");
boolean customTimestampField = "@timestamp".equals(timestampField);
if (timestampField == null || "_timestamp".equals(timestampField)) {
assertThat(prepareCreate("test-index")
.addMapping("test-type", "{ \"test-type\" : { \"_timestamp\" : { \"enabled\" : \"true\" }}}")
.get().isAcknowledged(), is(true));
}
String timestampField = randomFrom(null, "@timestamp");
boolean customTimestampField = timestampField != null;
IndexAction action = new IndexAction("test-index", "test-type", timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
@ -92,12 +87,15 @@ public class IndexActionTests extends ESIntegTestCase {
refresh(); //Manually refresh to make sure data is available
SearchResponse searchResponse = client().prepareSearch("test-index")
SearchRequestBuilder searchRequestbuilder = client().prepareSearch("test-index")
.setTypes("test-type")
.setSource(searchSource()
.query(matchAllQuery())
.aggregation(terms("timestamps").field(customTimestampField ? timestampField : "_timestamp")))
.get();
.setSource(searchSource().query(matchAllQuery()));
if (customTimestampField) {
searchRequestbuilder.addAggregation(terms("timestamps").field(timestampField));
}
SearchResponse searchResponse = searchRequestbuilder.get();
assertThat(searchResponse.getHits().totalHits(), equalTo(1L));
SearchHit hit = searchResponse.getHits().getAt(0);
@ -106,28 +104,24 @@ public class IndexActionTests extends ESIntegTestCase {
assertThat(hit.getSource().size(), is(2));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
Terms terms = searchResponse.getAggregations().get("timestamps");
assertThat(terms, notNullValue());
assertThat(terms.getBuckets(), hasSize(1));
assertThat(terms.getBuckets().get(0).getKeyAsNumber().longValue(), is(executionTime.getMillis()));
assertThat(terms.getBuckets().get(0).getDocCount(), is(1L));
} else {
assertThat(hit.getSource().size(), is(1));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
}
Terms terms = searchResponse.getAggregations().get("timestamps");
assertThat(terms, notNullValue());
assertThat(terms.getBuckets(), hasSize(1));
assertThat(terms.getBuckets().get(0).getKeyAsNumber().longValue(), is(executionTime.getMillis()));
assertThat(terms.getBuckets().get(0).getDocCount(), is(1L));
}
public void testIndexActionExecuteMultiDoc() throws Exception {
String timestampField = randomFrom(null, "_timestamp", "@timestamp");
String timestampField = randomFrom(null, "@timestamp");
boolean customTimestampField = "@timestamp".equals(timestampField);
if (timestampField == null || "_timestamp".equals(timestampField)) {
assertAcked(prepareCreate("test-index")
.addMapping("test-type", "_timestamp", "enabled=true", "foo", "type=keyword"));
} else {
assertAcked(prepareCreate("test-index")
.addMapping("test-type", "foo", "type=keyword"));
}
assertAcked(prepareCreate("test-index")
.addMapping("test-type", "foo", "type=keyword"));
Object list = randomFrom(
new Map[] { singletonMap("foo", "bar"), singletonMap("foo", "bar1") },
@ -160,8 +154,7 @@ public class IndexActionTests extends ESIntegTestCase {
SearchResponse searchResponse = client().prepareSearch("test-index")
.setTypes("test-type")
.setSource(searchSource().sort("foo", SortOrder.ASC)
.query(matchAllQuery())
.aggregation(terms("timestamps").field(customTimestampField ? timestampField : "_timestamp")))
.query(matchAllQuery()))
.get();
assertThat(searchResponse.getHits().totalHits(), equalTo(2L));

View File

@ -39,18 +39,14 @@ public class CompareConditionSearchTests extends AbstractWatcherIntegrationTestC
}
public void testExecuteWithAggs() throws Exception {
client().admin().indices().prepareCreate("my-index")
.addMapping("my-type", "_timestamp", "enabled=true")
.get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:00").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:10").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:20").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:30").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:00").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:10").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:20").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:30").get();
refresh();
SearchResponse response = client().prepareSearch("my-index")
.addAggregation(AggregationBuilders.dateHistogram("rate").field("_timestamp")
.addAggregation(AggregationBuilders.dateHistogram("rate").field("@timestamp")
.dateHistogramInterval(DateHistogramInterval.HOUR).order(Histogram.Order.COUNT_DESC))
.get();
@ -65,12 +61,12 @@ public class CompareConditionSearchTests extends AbstractWatcherIntegrationTestC
assertThat(resolvedValues.size(), is(1));
assertThat(resolvedValues, hasEntry("ctx.payload.aggregations.rate.buckets.0.doc_count", (Object) 4));
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:40").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setSource("@timestamp", "2005-01-01T00:40").get();
refresh();
response = client().prepareSearch("my-index")
.addAggregation(AggregationBuilders.dateHistogram("rate")
.field("_timestamp").dateHistogramInterval(DateHistogramInterval.HOUR).order(Histogram.Order.COUNT_DESC))
.field("@timestamp").dateHistogramInterval(DateHistogramInterval.HOUR).order(Histogram.Order.COUNT_DESC))
.get();
ctx = mockExecutionContext("_name", new Payload.XContent(response));

View File

@ -1,105 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.condition.script;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Script;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.junit.After;
import org.junit.Before;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;
/**
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/724")
public class ScriptConditionSearchTests extends AbstractWatcherIntegrationTestCase {
private ThreadPool tp = null;
private ScriptServiceProxy scriptService;
@Before
public void init() throws Exception {
tp = new TestThreadPool(ThreadPool.Names.SAME);
scriptService = WatcherTestUtils.getScriptServiceProxy(tp);
}
@After
public void cleanup() {
tp.shutdownNow();
}
public void testExecuteWithAggs() throws Exception {
client().admin().indices().prepareCreate("my-index")
.addMapping("my-type", "_timestamp", "enabled=true")
.get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:00").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:10").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:20").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:30").setSource("{}").get();
refresh();
SearchResponse response = client().prepareSearch("my-index")
.addAggregation(AggregationBuilders.dateHistogram("rate")
.field("_timestamp").dateHistogramInterval(DateHistogramInterval.HOUR).order(Histogram.Order.COUNT_DESC))
.get();
ExecutableScriptCondition condition = new ExecutableScriptCondition(
new ScriptCondition(Script.inline("ctx.payload.aggregations.rate.buckets[0]?.doc_count >= 5").build()),
logger, scriptService);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
assertFalse(condition.execute(ctx).met());
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:40").setSource("{}").get();
refresh();
response = client().prepareSearch("my-index")
.addAggregation(AggregationBuilders.dateHistogram("rate")
.field("_timestamp").dateHistogramInterval(DateHistogramInterval.HOUR).order(Histogram.Order.COUNT_DESC))
.get();
ctx = mockExecutionContext("_name", new Payload.XContent(response));
assertThat(condition.execute(ctx).met(), is(true));
}
public void testExecuteAccessHits() throws Exception {
ExecutableScriptCondition condition = new ExecutableScriptCondition(
new ScriptCondition(Script.inline("ctx.payload.hits?.hits[0]?._score == 1.0").build()), logger, scriptService);
InternalSearchHit hit = new InternalSearchHit(0, "1", new Text("type"), null);
hit.score(1f);
hit.shard(new SearchShardTarget("a", new Index("a", "testUUID"), 0));
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
new InternalSearchHits(new InternalSearchHit[]{hit}, 1L, 1f), null, null, null, false, false);
SearchResponse response = new SearchResponse(internalSearchResponse, "", 3, 3, 500L, new ShardSearchFailure[0]);
WatchExecutionContext ctx = mockExecutionContext("_watch_name", new Payload.XContent(response));
assertThat(condition.execute(ctx).met(), is(true));
hit.score(2f);
when(ctx.payload()).thenReturn(new Payload.XContent(response));
assertThat(condition.execute(ctx).met(), is(false));
}
}

View File

@ -384,7 +384,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
timeWarp().clock().setTime(SystemClock.INSTANCE.nowUTC());
String watchName = "_name";
assertAcked(prepareCreate("events").addMapping("event", "_timestamp", "enabled=true", "level", "type=text"));
assertAcked(prepareCreate("events").addMapping("event", "level", "type=text"));
watcherClient().preparePutWatch(watchName)
.setSource(watchBuilder()