diff --git a/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java b/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java index 885baa613dc..08c9f5efb60 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java +++ b/src/main/java/org/elasticsearch/watcher/actions/index/ExecutableIndexAction.java @@ -83,7 +83,7 @@ public class ExecutableIndexAction extends ExecutableAction { return new IndexAction.Result.Simulated(indexRequest.index(), action.docType, new XContentSource(indexRequest.source(), XContentType.JSON)); } - IndexResponse response = client.index(indexRequest); + IndexResponse response = client.index(indexRequest, action.timeout); XContentBuilder jsonBuilder = jsonBuilder(); indexResponseToXContent(jsonBuilder, response); return new IndexAction.Result.Success(new XContentSource(jsonBuilder)); @@ -110,7 +110,7 @@ public class ExecutableIndexAction extends ExecutableAction { indexRequest.source(jsonBuilder().prettyPrint().map(doc)); bulkRequest.add(indexRequest); } - BulkResponse bulkResponse = client.bulk(bulkRequest); + BulkResponse bulkResponse = client.bulk(bulkRequest, action.timeout); XContentBuilder jsonBuilder = jsonBuilder().startArray(); for (BulkItemResponse item : bulkResponse) { IndexResponse response = item.getResponse(); diff --git a/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java b/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java index 1e562f2043f..57e13f826c7 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java +++ b/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java @@ -7,12 +7,12 @@ package org.elasticsearch.watcher.actions.index; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.actions.Action; import org.elasticsearch.watcher.support.DynamicIndexName; +import org.elasticsearch.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.watcher.support.xcontent.XContentSource; import java.io.IOException; @@ -27,11 +27,13 @@ public class IndexAction implements Action { final String index; final String docType; final @Nullable String executionTimeField; + final @Nullable TimeValue timeout; - public IndexAction(String index, String docType, @Nullable String executionTimeField) { + public IndexAction(String index, String docType, @Nullable String executionTimeField, @Nullable TimeValue timeout) { this.index = index; this.docType = docType; this.executionTimeField = executionTimeField; + this.timeout = timeout; } @Override @@ -79,13 +81,17 @@ public class IndexAction implements Action { if (executionTimeField != null) { builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField); } + if (timeout != null) { + builder.field(Field.TIMEOUT.getPreferredName(), timeout); + }; return builder.endObject(); } - public static IndexAction parse(String watchId, String actionId, XContentParser parser) throws IOException { + public static IndexAction parse(String watchId, String actionId, XContentParser parser, TimeValue defaultTimeout) throws IOException { String index = null; String docType = null; String executionTimeField = null; + TimeValue timeout = defaultTimeout; String currentFieldName = null; XContentParser.Token token; @@ -103,6 +109,8 @@ public class IndexAction implements Action { docType = parser.text(); } else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName)) { executionTimeField = parser.text(); + } else if (Field.TIMEOUT.match(currentFieldName)) { + timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString()); } else { throw new IndexActionException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE, watchId, actionId, currentFieldName); } @@ -119,7 +127,7 @@ public class IndexAction implements Action { throw new IndexActionException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId, actionId, Field.DOC_TYPE.getPreferredName()); } - return new IndexAction(index, docType, executionTimeField); + return new IndexAction(index, docType, executionTimeField, timeout); } public static Builder builder(String index, String docType) { @@ -192,6 +200,7 @@ public class IndexAction implements Action { final String index; final String docType; String executionTimeField; + TimeValue timeout; private Builder(String index, String docType) { this.index = index; @@ -203,9 +212,14 @@ public class IndexAction implements Action { return this; } + public Builder timeout(TimeValue writeTimeout) { + this.timeout = writeTimeout; + return this; + } + @Override public IndexAction build() { - return new IndexAction(index, docType, executionTimeField); + return new IndexAction(index, docType, executionTimeField, timeout); } } @@ -216,5 +230,6 @@ public class IndexAction implements Action { ParseField SOURCE = new ParseField("source"); ParseField RESPONSE = new ParseField("response"); ParseField REQUEST = new ParseField("request"); + ParseField TIMEOUT = new ParseField("timeout"); } } diff --git a/src/main/java/org/elasticsearch/watcher/actions/index/IndexActionFactory.java b/src/main/java/org/elasticsearch/watcher/actions/index/IndexActionFactory.java index e8d8f62e72a..511d8d44ad5 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/index/IndexActionFactory.java +++ b/src/main/java/org/elasticsearch/watcher/actions/index/IndexActionFactory.java @@ -8,6 +8,7 @@ package org.elasticsearch.watcher.actions.index; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.actions.ActionFactory; import org.elasticsearch.watcher.actions.email.ExecutableEmailAction; @@ -23,6 +24,7 @@ public class IndexActionFactory extends ActionFactory successFullSlots = new ArrayList<>(); for (int i = 0; i < response.getItems().length; i++) { BulkItemResponse itemResponse = response.getItems()[i]; @@ -276,7 +276,7 @@ public class TriggeredWatchStore extends AbstractComponent { } SearchRequest searchRequest = createScanSearchRequest(); - SearchResponse response = client.search(searchRequest); + SearchResponse response = client.search(searchRequest, null); List triggeredWatches = new ArrayList<>(); try { if (response.getTotalShards() != response.getSuccessfulShards()) { diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java index cb9e3dcaafe..70b78e526ea 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.settings.ClusterDynamicSettings; import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.unit.TimeValue; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -166,7 +167,7 @@ public class HistoryStore extends AbstractComponent implements NodeSettingsServi IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) .source(XContentFactory.jsonBuilder().value(watchRecord)) .opType(IndexRequest.OpType.CREATE); - client.index(request); + client.index(request, (TimeValue) null); } catch (IOException e) { throw new HistoryException("failed to persist watch record [" + watchRecord + "]", e); } finally { diff --git a/src/main/java/org/elasticsearch/watcher/input/search/ExecutableSearchInput.java b/src/main/java/org/elasticsearch/watcher/input/search/ExecutableSearchInput.java index 48df4b984fd..358e6c03bac 100644 --- a/src/main/java/org/elasticsearch/watcher/input/search/ExecutableSearchInput.java +++ b/src/main/java/org/elasticsearch/watcher/input/search/ExecutableSearchInput.java @@ -65,7 +65,7 @@ public class ExecutableSearchInput extends ExecutableInput extractKeys; + private final @Nullable TimeValue timeout; - public SearchInput(SearchRequest searchRequest, @Nullable Set extractKeys) { + public SearchInput(SearchRequest searchRequest, @Nullable Set extractKeys, @Nullable TimeValue timeout) { this.searchRequest = searchRequest; this.extractKeys = extractKeys; + this.timeout = timeout; } @Override @@ -68,6 +72,10 @@ public class SearchInput implements Input { return extractKeys; } + public TimeValue getTimeout() { + return timeout; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -76,13 +84,17 @@ public class SearchInput implements Input { if (extractKeys != null) { builder.field(Field.EXTRACT.getPreferredName(), extractKeys); } + if (timeout != null) { + builder.field(Field.TIMEOUT.getPreferredName(), timeout); + } builder.endObject(); return builder; } - public static SearchInput parse(String watchId, XContentParser parser) throws IOException { + public static SearchInput parse(String watchId, XContentParser parser, TimeValue defaultTimeout) throws IOException { SearchRequest request = null; Set extract = null; + TimeValue timeout = defaultTimeout; String currentFieldName = null; XContentParser.Token token; @@ -108,6 +120,8 @@ public class SearchInput implements Input { } else { throw new SearchInputException("could not parse [{}] input for watch [{}]. unexpected array field [{}]", TYPE, watchId, currentFieldName); } + } else if (Field.TIMEOUT.match(currentFieldName)) { + timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString()); } else { throw new SearchInputException("could not parse [{}] input for watch [{}]. unexpected token [{}]", TYPE, watchId, token); } @@ -116,7 +130,7 @@ public class SearchInput implements Input { if (request == null) { throw new SearchInputException("could not parse [{}] input for watch [{}]. missing required [{}] field", TYPE, watchId, Field.REQUEST.getPreferredName()); } - return new SearchInput(request, extract); + return new SearchInput(request, extract, timeout); } public static Builder builder(SearchRequest request) { @@ -157,6 +171,7 @@ public class SearchInput implements Input { private final SearchRequest request; private final ImmutableSet.Builder extractKeys = ImmutableSet.builder(); + private TimeValue timeout; private Builder(SearchRequest request) { this.request = request; @@ -172,15 +187,21 @@ public class SearchInput implements Input { return this; } + public Builder timeout(TimeValue readTimeout) { + this.timeout = readTimeout; + return this; + } + @Override public SearchInput build() { Set keys = extractKeys.build(); - return new SearchInput(request, keys.isEmpty() ? null : keys); + return new SearchInput(request, keys.isEmpty() ? null : keys, timeout); } } public interface Field extends Input.Field { ParseField REQUEST = new ParseField("request"); ParseField EXTRACT = new ParseField("extract"); + ParseField TIMEOUT = new ParseField("timeout"); } } diff --git a/src/main/java/org/elasticsearch/watcher/input/search/SearchInputFactory.java b/src/main/java/org/elasticsearch/watcher/input/search/SearchInputFactory.java index 1411a7f79c9..389ffbfb80a 100644 --- a/src/main/java/org/elasticsearch/watcher/input/search/SearchInputFactory.java +++ b/src/main/java/org/elasticsearch/watcher/input/search/SearchInputFactory.java @@ -8,6 +8,7 @@ package org.elasticsearch.watcher.input.search; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.input.InputFactory; import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput; @@ -23,6 +24,7 @@ public class SearchInputFactory extends InputFactory listener) { @@ -83,31 +94,34 @@ public class ClientProxy implements InitializingService.Initializable { } public DeleteResponse delete(DeleteRequest request) { - return client.delete(preProcess(request)).actionGet(); + return client.delete(preProcess(request)).actionGet(defaultIndexTimeout); } - public SearchResponse search(SearchRequest request) { - return client.search(preProcess(request)).actionGet(5, TimeUnit.SECONDS); + public SearchResponse search(SearchRequest request, TimeValue timeout) { + if (timeout == null) { + timeout = defaultSearchTimeout; + } + return client.search(preProcess(request)).actionGet(timeout); } public SearchResponse searchScroll(String scrollId, TimeValue timeout) { SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeout); - return client.searchScroll(preProcess(request)).actionGet(); + return client.searchScroll(preProcess(request)).actionGet(defaultSearchTimeout); } public ClearScrollResponse clearScroll(String scrollId) { ClearScrollRequest request = new ClearScrollRequest(); request.addScrollId(scrollId); - return client.clearScroll(preProcess(request)).actionGet(); + return client.clearScroll(preProcess(request)).actionGet(defaultSearchTimeout); } public RefreshResponse refresh(RefreshRequest request) { - return client.admin().indices().refresh(preProcess(request)).actionGet(); + return client.admin().indices().refresh(preProcess(request)).actionGet(defaultSearchTimeout); } public PutIndexTemplateResponse putTemplate(PutIndexTemplateRequest request) { preProcess(request); - return client.admin().indices().putTemplate(request).actionGet(); + return client.admin().indices().putTemplate(request).actionGet(defaultIndexTimeout); } M preProcess(M message) { diff --git a/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java index a063b092a83..df1e36fe5b9 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java @@ -30,7 +30,7 @@ public class ExecutableSearchTransform extends ExecutableTransform { private final SearchRequest request; + private TimeValue timeout; public Builder(SearchRequest request) { this.request = request; } + public Builder timeout(TimeValue readTimeout) { + this.timeout = readTimeout; + return this; + } + @Override public SearchTransform build() { - return new SearchTransform(request); + return new SearchTransform(request, timeout); } } public interface Field extends Transform.Field { ParseField REQUEST = new ParseField("request"); + ParseField TIMEOUT = new ParseField("timeout"); } } diff --git a/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformFactory.java b/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformFactory.java index 6918937bb24..d8c1dc03afa 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformFactory.java +++ b/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformFactory.java @@ -8,6 +8,7 @@ package org.elasticsearch.watcher.transform.search; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.support.DynamicIndexName; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; @@ -22,6 +23,7 @@ public class SearchTransformFactory extends TransformFactoryany())).thenReturn(indexResponse); historyStore.put(watchRecord); - verify(clientProxy).index(Matchers.any()); + verify(clientProxy).index(Matchers.any(), Matchers.any()); } @Test(expected = HistoryException.class) diff --git a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java index 5e4d8bf7fce..e7275060cfd 100644 --- a/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.watcher.input.search; -import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest; import org.elasticsearch.action.search.SearchRequest; @@ -13,6 +12,7 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -115,7 +115,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .request() .source(searchSourceBuilder); - ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); + ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); WatchExecutionContext ctx = new TriggeredExecutionContext( new Watch("test-watch", new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), @@ -222,7 +222,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .request() .source(searchSourceBuilder); - ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); + ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser()); WatchExecutionContext ctx = new TriggeredExecutionContext( new Watch("test-watch", new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), @@ -253,7 +253,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .source(searchSource() .query(filteredQuery(matchQuery("event_type", "a"), rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}")))); - XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null)); + TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(randomInt(10)) : null; + XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, timeout)); XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); @@ -261,6 +262,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { SearchInput searchInput = factory.parseInput("_id", parser); assertEquals(SearchInput.TYPE, searchInput.type()); + assertThat(searchInput.getTimeout(), equalTo(timeout != null ? timeout : TimeValue.timeValueSeconds(30))); // 30s is the default } @Test @@ -272,7 +274,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .source(searchSource() .query(boolQuery().must(matchQuery("event_type", "a")).filter(rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}")))); - XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null)); + XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, null)); XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); @@ -308,7 +310,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { .source(searchSource() .query(filteredQuery(matchQuery("event_type", "a"), rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}")))); - XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null)); + XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, null)); XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java index 94652fdd8e4..fb73b92ad5b 100644 --- a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java +++ b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java @@ -223,7 +223,7 @@ public final class WatcherTestUtils { new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")), new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger), new ExecutableScriptCondition(new ScriptCondition(Script.inline("return true").build()), logger, scriptService), - new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, client, new DynamicIndexName.Parser()), + new ExecutableSearchTransform(new SearchTransform(transformRequest, null), logger, client, new DynamicIndexName.Parser()), new TimeValue(0), new ExecutableActions(actions), metadata, diff --git a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java index feb5f20e782..2ce8d89fa07 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -245,6 +246,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { SearchType searchType = getRandomSupportedSearchType(); String templateName = randomBoolean() ? null : "template1"; XContentBuilder builder = jsonBuilder().startObject(); + builder.startObject("request"); if (indices != null) { builder.array("indices", indices); } @@ -272,6 +274,11 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .endObject(); builder.endObject(); + TimeValue readTimeout = randomBoolean() ? TimeValue.timeValueSeconds(randomInt(10)) : null; + if (readTimeout != null) { + builder.field("timeout", readTimeout); + } + builder.endObject(); XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); @@ -292,12 +299,14 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { assertThat(executable.transform().getRequest().templateSource().toUtf8(), equalTo("{\"file\":\"template1\"}")); } assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes())); + assertThat(executable.transform().getTimeout(), equalTo(readTimeout != null ? readTimeout : TimeValue.timeValueSeconds(30))); // 30s is the default } @Test public void testParser_WithIndexNames() throws Exception { SearchType searchType = getRandomSupportedSearchType(); XContentBuilder builder = jsonBuilder().startObject(); + builder.startObject("request"); builder.array("indices", "idx", ""); if (searchType != null) { builder.field("search_type", searchType.name()); @@ -311,6 +320,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { .endObject(); builder.endObject(); + builder.endObject(); XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java index 8e6c7ca6182..943929662e8 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java @@ -133,7 +133,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { } verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); - verify(clientProxy, never()).search(any(SearchRequest.class)); + verify(clientProxy, never()).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, never()).clearScroll(anyString()); } @@ -160,7 +160,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); SearchResponse searchResponse = mockSearchResponse(1, 0, 0); - when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse); + when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse); when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); @@ -173,7 +173,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { } verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); - verify(clientProxy, times(1)).search(any(SearchRequest.class)); + verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, times(1)).clearScroll(anyString()); } @@ -200,7 +200,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); SearchResponse searchResponse = mockSearchResponse(1, 1, 0); - when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse); + when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse); when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); @@ -211,7 +211,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { assertThat(watchStore.watches().size(), equalTo(0)); verify(templateUtils, times(1)).putTemplate("watches", null); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); - verify(clientProxy, times(1)).search(any(SearchRequest.class)); + verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, times(1)).clearScroll(anyString()); } @@ -238,7 +238,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); SearchResponse searchResponse1 = mockSearchResponse(1, 1, 2); - when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse1); + when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse1); BytesReference source = new BytesArray("{}"); InternalSearchHit hit1 = new InternalSearchHit(0, "_id1", new StringText("type"), Collections.emptyMap()); @@ -266,7 +266,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { assertThat(watchStore.watches().size(), equalTo(2)); verify(templateUtils, times(1)).putTemplate("watches", null); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); - verify(clientProxy, times(1)).search(any(SearchRequest.class)); + verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class)); verify(clientProxy, times(1)).clearScroll(anyString()); } diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java index 7c988d448ad..ffd7fe82f9d 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java @@ -358,17 +358,18 @@ public class WatchTests extends ElasticsearchTestCase { private ExecutableTransform randomTransform() { String type = randomFrom(ScriptTransform.TYPE, SearchTransform.TYPE, ChainTransform.TYPE); + TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(5) : null; switch (type) { case ScriptTransform.TYPE: return new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService); case SearchTransform.TYPE: - return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client, indexNameParser); + return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout), logger, client, indexNameParser); default: // chain ChainTransform chainTransform = new ChainTransform(ImmutableList.of( - new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), + new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout), new ScriptTransform(Script.inline("_script").build()))); return new ExecutableChainTransform(chainTransform, logger, ImmutableList.of( - new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client, indexNameParser), + new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout), logger, client, indexNameParser), new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService))); } } @@ -392,7 +393,7 @@ public class WatchTests extends ElasticsearchTestCase { list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), randomThrottler(), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer))); } if (randomBoolean()) { - IndexAction action = new IndexAction("_index", "_type", null); + IndexAction action = new IndexAction("_index", "_type", null, null); list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(action, logger, client, indexNameParser))); } if (randomBoolean()) {