Added timeout option to the `search` transform, `search` input and `index` action.

Added default timeout setting:
* `watcher.action.index.default_timeout` sets the timeout for `index` action, if no timeout is defined in the `index` action itself.
* `watcher.input.search.default_timeout` sets the timeout for `search` input, if no timeout is defined in the `search` input itself.
* `watcher.transform.search.default_timeout`  sets the timeout for `search` transform, if no timeout is defined in the `search` transform itself.

Added general Watcher wide default timeout settings to the operations exposed in the client proxy:
* `watcher.internal.ops.search.default_timeout` for search related operations.
* `watcher.internal.ops.index.default_timeout` for index operations.
* `watcher.internal.ops.bulk.default_timeout` for bulk operations.

Original commit: elastic/x-pack-elasticsearch@5a3ef35a9d
This commit is contained in:
Martijn van Groningen 2015-06-23 16:27:17 +02:00
parent 591ea500f2
commit 193d8092db
21 changed files with 205 additions and 79 deletions

View File

@ -83,7 +83,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
return new IndexAction.Result.Simulated(indexRequest.index(), action.docType, new XContentSource(indexRequest.source(), XContentType.JSON));
}
IndexResponse response = client.index(indexRequest);
IndexResponse response = client.index(indexRequest, action.timeout);
XContentBuilder jsonBuilder = jsonBuilder();
indexResponseToXContent(jsonBuilder, response);
return new IndexAction.Result.Success(new XContentSource(jsonBuilder));
@ -110,7 +110,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
indexRequest.source(jsonBuilder().prettyPrint().map(doc));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client.bulk(bulkRequest);
BulkResponse bulkResponse = client.bulk(bulkRequest, action.timeout);
XContentBuilder jsonBuilder = jsonBuilder().startArray();
for (BulkItemResponse item : bulkResponse) {
IndexResponse response = item.getResponse();

View File

@ -7,12 +7,12 @@ package org.elasticsearch.watcher.actions.index;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import java.io.IOException;
@ -27,11 +27,13 @@ public class IndexAction implements Action {
final String index;
final String docType;
final @Nullable String executionTimeField;
final @Nullable TimeValue timeout;
public IndexAction(String index, String docType, @Nullable String executionTimeField) {
public IndexAction(String index, String docType, @Nullable String executionTimeField, @Nullable TimeValue timeout) {
this.index = index;
this.docType = docType;
this.executionTimeField = executionTimeField;
this.timeout = timeout;
}
@Override
@ -79,13 +81,17 @@ public class IndexAction implements Action {
if (executionTimeField != null) {
builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField);
}
if (timeout != null) {
builder.field(Field.TIMEOUT.getPreferredName(), timeout);
};
return builder.endObject();
}
public static IndexAction parse(String watchId, String actionId, XContentParser parser) throws IOException {
public static IndexAction parse(String watchId, String actionId, XContentParser parser, TimeValue defaultTimeout) throws IOException {
String index = null;
String docType = null;
String executionTimeField = null;
TimeValue timeout = defaultTimeout;
String currentFieldName = null;
XContentParser.Token token;
@ -103,6 +109,8 @@ public class IndexAction implements Action {
docType = parser.text();
} else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName)) {
executionTimeField = parser.text();
} else if (Field.TIMEOUT.match(currentFieldName)) {
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString());
} else {
throw new IndexActionException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE, watchId, actionId, currentFieldName);
}
@ -119,7 +127,7 @@ public class IndexAction implements Action {
throw new IndexActionException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId, actionId, Field.DOC_TYPE.getPreferredName());
}
return new IndexAction(index, docType, executionTimeField);
return new IndexAction(index, docType, executionTimeField, timeout);
}
public static Builder builder(String index, String docType) {
@ -192,6 +200,7 @@ public class IndexAction implements Action {
final String index;
final String docType;
String executionTimeField;
TimeValue timeout;
private Builder(String index, String docType) {
this.index = index;
@ -203,9 +212,14 @@ public class IndexAction implements Action {
return this;
}
public Builder timeout(TimeValue writeTimeout) {
this.timeout = writeTimeout;
return this;
}
@Override
public IndexAction build() {
return new IndexAction(index, docType, executionTimeField);
return new IndexAction(index, docType, executionTimeField, timeout);
}
}
@ -216,5 +230,6 @@ public class IndexAction implements Action {
ParseField SOURCE = new ParseField("source");
ParseField RESPONSE = new ParseField("response");
ParseField REQUEST = new ParseField("request");
ParseField TIMEOUT = new ParseField("timeout");
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.actions.index;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.actions.ActionFactory;
import org.elasticsearch.watcher.actions.email.ExecutableEmailAction;
@ -23,6 +24,7 @@ public class IndexActionFactory extends ActionFactory<IndexAction, ExecutableInd
private final ClientProxy client;
private final DynamicIndexName.Parser indexNamesParser;
private final TimeValue defaultTimeout;
@Inject
public IndexActionFactory(Settings settings, ClientProxy client) {
@ -30,6 +32,7 @@ public class IndexActionFactory extends ActionFactory<IndexAction, ExecutableInd
this.client = client;
String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.actions.index");
this.indexNamesParser = new DynamicIndexName.Parser(defaultDateFormat);
this.defaultTimeout = settings.getAsTime("watcher.action.index.default_timeout", TimeValue.timeValueSeconds(60));
}
@Override
@ -39,7 +42,7 @@ public class IndexActionFactory extends ActionFactory<IndexAction, ExecutableInd
@Override
public IndexAction parseAction(String watchId, String actionId, XContentParser parser) throws IOException {
return IndexAction.parse(watchId, actionId, parser);
return IndexAction.parse(watchId, actionId, parser, defaultTimeout);
}
@Override

View File

@ -131,7 +131,7 @@ public class TriggeredWatchStore extends AbstractComponent {
IndexRequest request = new IndexRequest(INDEX_NAME, DOC_TYPE, triggeredWatch.id().value())
.source(XContentFactory.jsonBuilder().value(triggeredWatch))
.opType(IndexRequest.OpType.CREATE);
client.index(request);
client.index(request, (TimeValue) null);
} catch (IOException e) {
throw new TriggeredWatchException("failed to persist triggered watch [{}]", e, triggeredWatch);
} finally {
@ -228,7 +228,7 @@ public class TriggeredWatchStore extends AbstractComponent {
indexRequest.opType(IndexRequest.OpType.CREATE);
request.add(indexRequest);
}
BulkResponse response = client.bulk(request);
BulkResponse response = client.bulk(request, (TimeValue) null);
List<Integer> successFullSlots = new ArrayList<>();
for (int i = 0; i < response.getItems().length; i++) {
BulkItemResponse itemResponse = response.getItems()[i];
@ -276,7 +276,7 @@ public class TriggeredWatchStore extends AbstractComponent {
}
SearchRequest searchRequest = createScanSearchRequest();
SearchResponse response = client.search(searchRequest);
SearchResponse response = client.search(searchRequest, null);
List<TriggeredWatch> triggeredWatches = new ArrayList<>();
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {

View File

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@ -166,7 +167,7 @@ public class HistoryStore extends AbstractComponent implements NodeSettingsServi
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(XContentFactory.jsonBuilder().value(watchRecord))
.opType(IndexRequest.OpType.CREATE);
client.index(request);
client.index(request, (TimeValue) null);
} catch (IOException e) {
throw new HistoryException("failed to persist watch record [" + watchRecord + "]", e);
} finally {

View File

@ -65,7 +65,7 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), XContentHelper.convertToJson(source, false, true));
}
SearchResponse response = client.search(request);
SearchResponse response = client.search(request, input.getTimeout());
if (logger.isDebugEnabled()) {
logger.debug("[{}] found [{}] hits", ctx.id(), response.getHits().getTotalHits());

View File

@ -9,11 +9,13 @@ import com.google.common.collect.ImmutableSet;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.SearchRequestEquivalence;
import org.elasticsearch.watcher.support.SearchRequestParseException;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.watch.Payload;
@ -31,10 +33,12 @@ public class SearchInput implements Input {
private final SearchRequest searchRequest;
private final @Nullable Set<String> extractKeys;
private final @Nullable TimeValue timeout;
public SearchInput(SearchRequest searchRequest, @Nullable Set<String> extractKeys) {
public SearchInput(SearchRequest searchRequest, @Nullable Set<String> extractKeys, @Nullable TimeValue timeout) {
this.searchRequest = searchRequest;
this.extractKeys = extractKeys;
this.timeout = timeout;
}
@Override
@ -68,6 +72,10 @@ public class SearchInput implements Input {
return extractKeys;
}
public TimeValue getTimeout() {
return timeout;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -76,13 +84,17 @@ public class SearchInput implements Input {
if (extractKeys != null) {
builder.field(Field.EXTRACT.getPreferredName(), extractKeys);
}
if (timeout != null) {
builder.field(Field.TIMEOUT.getPreferredName(), timeout);
}
builder.endObject();
return builder;
}
public static SearchInput parse(String watchId, XContentParser parser) throws IOException {
public static SearchInput parse(String watchId, XContentParser parser, TimeValue defaultTimeout) throws IOException {
SearchRequest request = null;
Set<String> extract = null;
TimeValue timeout = defaultTimeout;
String currentFieldName = null;
XContentParser.Token token;
@ -108,6 +120,8 @@ public class SearchInput implements Input {
} else {
throw new SearchInputException("could not parse [{}] input for watch [{}]. unexpected array field [{}]", TYPE, watchId, currentFieldName);
}
} else if (Field.TIMEOUT.match(currentFieldName)) {
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString());
} else {
throw new SearchInputException("could not parse [{}] input for watch [{}]. unexpected token [{}]", TYPE, watchId, token);
}
@ -116,7 +130,7 @@ public class SearchInput implements Input {
if (request == null) {
throw new SearchInputException("could not parse [{}] input for watch [{}]. missing required [{}] field", TYPE, watchId, Field.REQUEST.getPreferredName());
}
return new SearchInput(request, extract);
return new SearchInput(request, extract, timeout);
}
public static Builder builder(SearchRequest request) {
@ -157,6 +171,7 @@ public class SearchInput implements Input {
private final SearchRequest request;
private final ImmutableSet.Builder<String> extractKeys = ImmutableSet.builder();
private TimeValue timeout;
private Builder(SearchRequest request) {
this.request = request;
@ -172,15 +187,21 @@ public class SearchInput implements Input {
return this;
}
public Builder timeout(TimeValue readTimeout) {
this.timeout = readTimeout;
return this;
}
@Override
public SearchInput build() {
Set<String> keys = extractKeys.build();
return new SearchInput(request, keys.isEmpty() ? null : keys);
return new SearchInput(request, keys.isEmpty() ? null : keys, timeout);
}
}
public interface Field extends Input.Field {
ParseField REQUEST = new ParseField("request");
ParseField EXTRACT = new ParseField("extract");
ParseField TIMEOUT = new ParseField("timeout");
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.input.search;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.InputFactory;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
@ -23,6 +24,7 @@ public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Re
private final ClientProxy client;
private final DynamicIndexName.Parser indexNameParser;
private final TimeValue defaultTimeout;
@Inject
public SearchInputFactory(Settings settings, ClientProxy client) {
@ -30,6 +32,7 @@ public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Re
this.client = client;
String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.input.search");
this.indexNameParser = new DynamicIndexName.Parser(defaultDateFormat);
this.defaultTimeout = settings.getAsTime("watcher.input.search.default_timeout", TimeValue.timeValueSeconds(30));
}
@Override
@ -39,7 +42,7 @@ public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Re
@Override
public SearchInput parseInput(String watchId, XContentParser parser) throws IOException {
return SearchInput.parse(watchId, parser);
return SearchInput.parse(watchId, parser, defaultTimeout);
}
@Override

View File

@ -23,13 +23,12 @@ import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.watcher.shield.ShieldIntegration;
import org.elasticsearch.watcher.support.init.InitializingService;
import java.util.concurrent.TimeUnit;
/**
* A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client
* needs to injected to be avoid circular dependencies issues.
@ -37,18 +36,24 @@ import java.util.concurrent.TimeUnit;
public class ClientProxy implements InitializingService.Initializable {
private final ShieldIntegration shieldIntegration;
private final TimeValue defaultSearchTimeout;
private final TimeValue defaultIndexTimeout;
private final TimeValue defaultBulkTimeout;
private Client client;
@Inject
public ClientProxy(ShieldIntegration shieldIntegration) {
public ClientProxy(Settings settings, ShieldIntegration shieldIntegration) {
this.shieldIntegration = shieldIntegration;
defaultSearchTimeout = settings.getAsTime("watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30));
defaultIndexTimeout = settings.getAsTime("watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(60));
defaultBulkTimeout = settings.getAsTime("watcher.internal.ops.bulk.default_timeout", TimeValue.timeValueSeconds(120));
}
/**
* Creates a proxy to the given client (can be used for testing)
*/
public static ClientProxy of(Client client) {
ClientProxy proxy = new ClientProxy(null);
ClientProxy proxy = new ClientProxy(Settings.EMPTY, null);
proxy.client = client;
return proxy;
}
@ -62,16 +67,22 @@ public class ClientProxy implements InitializingService.Initializable {
return client.admin();
}
public IndexResponse index(IndexRequest request) {
return client.index(preProcess(request)).actionGet();
public IndexResponse index(IndexRequest request, TimeValue timeout) {
if (timeout == null) {
timeout = defaultIndexTimeout;
}
return client.index(preProcess(request)).actionGet(timeout);
}
public UpdateResponse update(UpdateRequest request) {
return client.update(preProcess(request)).actionGet();
return client.update(preProcess(request)).actionGet(defaultIndexTimeout);
}
public BulkResponse bulk(BulkRequest request) {
return client.bulk(preProcess(request)).actionGet();
public BulkResponse bulk(BulkRequest request, TimeValue timeout) {
if (timeout == null) {
timeout = defaultBulkTimeout;
}
return client.bulk(preProcess(request)).actionGet(timeout);
}
public void index(IndexRequest request, ActionListener<IndexResponse> listener) {
@ -83,31 +94,34 @@ public class ClientProxy implements InitializingService.Initializable {
}
public DeleteResponse delete(DeleteRequest request) {
return client.delete(preProcess(request)).actionGet();
return client.delete(preProcess(request)).actionGet(defaultIndexTimeout);
}
public SearchResponse search(SearchRequest request) {
return client.search(preProcess(request)).actionGet(5, TimeUnit.SECONDS);
public SearchResponse search(SearchRequest request, TimeValue timeout) {
if (timeout == null) {
timeout = defaultSearchTimeout;
}
return client.search(preProcess(request)).actionGet(timeout);
}
public SearchResponse searchScroll(String scrollId, TimeValue timeout) {
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeout);
return client.searchScroll(preProcess(request)).actionGet();
return client.searchScroll(preProcess(request)).actionGet(defaultSearchTimeout);
}
public ClearScrollResponse clearScroll(String scrollId) {
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
return client.clearScroll(preProcess(request)).actionGet();
return client.clearScroll(preProcess(request)).actionGet(defaultSearchTimeout);
}
public RefreshResponse refresh(RefreshRequest request) {
return client.admin().indices().refresh(preProcess(request)).actionGet();
return client.admin().indices().refresh(preProcess(request)).actionGet(defaultSearchTimeout);
}
public PutIndexTemplateResponse putTemplate(PutIndexTemplateRequest request) {
preProcess(request);
return client.admin().indices().putTemplate(request).actionGet();
return client.admin().indices().putTemplate(request).actionGet(defaultIndexTimeout);
}
<M extends TransportMessage> M preProcess(M message) {

View File

@ -30,7 +30,7 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
public ExecutableSearchTransform(SearchTransform transform, ESLogger logger, ClientProxy client, DynamicIndexName.Parser indexNameParser) {
super(transform, logger);
this.client = client;
String[] indices = transform.request.indices();
String[] indices = transform.getRequest().indices();
this.indexNames = indices != null ? indexNameParser.parse(indices) : null;
}
@ -42,8 +42,8 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload) {
SearchRequest request = null;
try {
request = WatcherUtils.createSearchRequestFromPrototype(transform.request, indexNames, ctx, payload);
SearchResponse resp = client.search(request);
request = WatcherUtils.createSearchRequestFromPrototype(transform.getRequest(), indexNames, ctx, payload);
SearchResponse resp = client.search(request, transform.getTimeout());
return new SearchTransform.Result(request, new Payload.XContent(resp));
} catch (Exception e) {
logger.error("failed to execute [{}] transform for [{}]", e, SearchTransform.TYPE, ctx.id());

View File

@ -8,10 +8,12 @@ package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.SearchRequestEquivalence;
import org.elasticsearch.watcher.support.SearchRequestParseException;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.watch.Payload;
@ -25,10 +27,12 @@ public class SearchTransform implements Transform {
public static final String TYPE = "search";
protected final SearchRequest request;
private final SearchRequest request;
private final @Nullable TimeValue timeout;
public SearchTransform(SearchRequest request) {
public SearchTransform(SearchRequest request, @Nullable TimeValue timeout) {
this.request = request;
this.timeout = timeout;
}
@Override
@ -40,6 +44,10 @@ public class SearchTransform implements Transform {
return request;
}
public TimeValue getTimeout() {
return timeout;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -57,16 +65,42 @@ public class SearchTransform implements Transform {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return WatcherUtils.writeSearchRequest(request, builder, params);
builder.startObject();
builder.field(Field.REQUEST.getPreferredName());
builder = WatcherUtils.writeSearchRequest(request, builder, params);
if (timeout != null) {
builder.field(Field.TIMEOUT.getPreferredName(), timeout);
}
builder.endObject();
return builder;
}
public static SearchTransform parse(String watchId, XContentParser parser) throws IOException {
try {
SearchRequest request = WatcherUtils.readSearchRequest(parser, ExecutableSearchTransform.DEFAULT_SEARCH_TYPE);
return new SearchTransform(request);
} catch (SearchRequestParseException srpe) {
throw new SearchTransformException("could not parse [{}] transform for watch [{}]. failed parsing search request", srpe, TYPE, watchId);
public static SearchTransform parse(String watchId, XContentParser parser, TimeValue defaultTimeout) throws IOException {
SearchRequest request = null;
TimeValue timeout = defaultTimeout;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (Field.REQUEST.match(currentFieldName)) {
try {
request = WatcherUtils.readSearchRequest(parser, ExecutableSearchTransform.DEFAULT_SEARCH_TYPE);
} catch (SearchRequestParseException srpe) {
throw new SearchTransformException("could not parse [{}] transform for watch [{}]. failed to parse [{}]", srpe, TYPE, watchId, currentFieldName);
}
} else if (Field.TIMEOUT.match(currentFieldName)) {
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString());
} else {
throw new SearchTransformException("could not parse [{}] transform for watch [{}]. unexpected field [{}]", TYPE, watchId, currentFieldName);
}
}
if (request == null) {
throw new SearchTransformException("could not parse [{}] transform for watch [{}]. missing required [{}] field", TYPE, watchId, Field.REQUEST.getPreferredName());
}
return new SearchTransform(request, timeout);
}
public static Builder builder(SearchRequest request) {
@ -106,18 +140,25 @@ public class SearchTransform implements Transform {
public static class Builder implements Transform.Builder<SearchTransform> {
private final SearchRequest request;
private TimeValue timeout;
public Builder(SearchRequest request) {
this.request = request;
}
public Builder timeout(TimeValue readTimeout) {
this.timeout = readTimeout;
return this;
}
@Override
public SearchTransform build() {
return new SearchTransform(request);
return new SearchTransform(request, timeout);
}
}
public interface Field extends Transform.Field {
ParseField REQUEST = new ParseField("request");
ParseField TIMEOUT = new ParseField("timeout");
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
@ -22,6 +23,7 @@ public class SearchTransformFactory extends TransformFactory<SearchTransform, Se
protected final ClientProxy client;
protected final DynamicIndexName.Parser indexNameParser;
private final TimeValue defaultTimeout;
@Inject
public SearchTransformFactory(Settings settings, ClientProxy client) {
@ -29,6 +31,7 @@ public class SearchTransformFactory extends TransformFactory<SearchTransform, Se
this.client = client;
String defaultDateFormat = DynamicIndexName.defaultDateFormat(settings, "watcher.transform.search");
this.indexNameParser = new DynamicIndexName.Parser(defaultDateFormat);
this.defaultTimeout = settings.getAsTime("watcher.transform.search.default_timeout", TimeValue.timeValueSeconds(30));
}
@Override
@ -38,7 +41,7 @@ public class SearchTransformFactory extends TransformFactory<SearchTransform, Se
@Override
public SearchTransform parseTransform(String watchId, XContentParser parser) throws IOException {
return SearchTransform.parse(watchId, parser);
return SearchTransform.parse(watchId, parser, defaultTimeout);
}
@Override

View File

@ -132,7 +132,7 @@ public class WatchStore extends AbstractComponent {
public WatchPut put(Watch watch) {
ensureStarted();
IndexRequest indexRequest = createIndexRequest(watch.id(), watch.getAsBytes(), Versions.MATCH_ANY);
IndexResponse response = client.index(indexRequest);
IndexResponse response = client.index(indexRequest, (TimeValue) null);
watch.status().version(response.getVersion());
watch.version(response.getVersion());
Watch previous = watches.put(watch.id(), watch);
@ -225,7 +225,7 @@ public class WatchStore extends AbstractComponent {
.source(new SearchSourceBuilder()
.size(scrollSize)
.version(true));
SearchResponse response = client.search(searchRequest);
SearchResponse response = client.search(searchRequest, null);
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {
throw new ElasticsearchException("Partial response while loading watches");

View File

@ -10,6 +10,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -72,7 +73,7 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
.get().isAcknowledged(), is(true));
}
IndexAction action = new IndexAction("test-index", "test-type", timestampField);
IndexAction action = new IndexAction("test-index", "test-type", timestampField, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
DateTime executionTime = DateTime.now(UTC);
Payload payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", ImmutableMap.of("foo", "bar"));
@ -135,7 +136,7 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
ImmutableSet.of(ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar1"))
);
IndexAction action = new IndexAction("test-index", "test-type", timestampField);
IndexAction action = new IndexAction("test-index", "test-type", timestampField, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
DateTime executionTime = DateTime.now(UTC);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, new Payload.Simple("_doc", list));
@ -197,6 +198,10 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
if (timestampField != null) {
builder.field(IndexAction.Field.EXECUTION_TIME_FIELD.getPreferredName(), timestampField);
}
TimeValue writeTimeout = randomBoolean() ? TimeValue.timeValueSeconds(randomInt(10)) : null;
if (writeTimeout != null) {
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout);
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.of(client()));
@ -210,6 +215,12 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
if (timestampField != null) {
assertThat(executable.action().executionTimeField, equalTo(timestampField));
}
if (writeTimeout != null) {
assertThat(executable.action().timeout, equalTo(writeTimeout));
} else {
// default:
assertThat(executable.action().timeout, equalTo(TimeValue.timeValueSeconds(60)));
}
}
@Test

View File

@ -182,7 +182,7 @@ public class TriggeredWatchStoreTests extends ElasticsearchTestCase {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getSuccessfulShards()).thenReturn(0);
when(searchResponse.getTotalShards()).thenReturn(1);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse);
when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1));
@ -194,7 +194,7 @@ public class TriggeredWatchStoreTests extends ElasticsearchTestCase {
assertThat(e.getMessage(), equalTo("scan search was supposed to run on [1] shards, but ran on [0] shards"));
}
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}
@ -227,7 +227,7 @@ public class TriggeredWatchStoreTests extends ElasticsearchTestCase {
when(searchResponse.getSuccessfulShards()).thenReturn(1);
when(searchResponse.getTotalShards()).thenReturn(1);
when(searchResponse.getHits()).thenReturn(InternalSearchHits.empty());
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse);
when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1));
@ -237,7 +237,7 @@ public class TriggeredWatchStoreTests extends ElasticsearchTestCase {
assertThat(triggeredWatches, hasSize(0));
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}
@ -276,7 +276,7 @@ public class TriggeredWatchStoreTests extends ElasticsearchTestCase {
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[]{hit}, 1, 1.0f);
when(searchResponse1.getHits()).thenReturn(hits);
when(searchResponse1.getScrollId()).thenReturn("_scrollId");
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse1);
when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse1);
// First return a scroll response with a single hit and then with no hits
SearchResponse searchResponse2 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId", 1, 1, 1, null);
@ -294,7 +294,7 @@ public class TriggeredWatchStoreTests extends ElasticsearchTestCase {
assertThat(triggeredWatches, hasSize(1));
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.history;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
@ -53,9 +54,9 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
IndexResponse indexResponse = mock(IndexResponse.class);
IndexRequest indexRequest = indexRequest(".watch_history-1970.01.01", HistoryStore.DOC_TYPE, wid.value(), IndexRequest.OpType.CREATE);
when(clientProxy.index(indexRequest)).thenReturn(indexResponse);
when(clientProxy.index(indexRequest, Matchers.<TimeValue>any())).thenReturn(indexResponse);
historyStore.put(watchRecord);
verify(clientProxy).index(Matchers.<IndexRequest>any());
verify(clientProxy).index(Matchers.<IndexRequest>any(), Matchers.<TimeValue>any());
}
@Test(expected = HistoryException.class)

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.watcher.input.search;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -13,6 +12,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -115,7 +115,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.request()
.source(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
@ -222,7 +222,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.request()
.source(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null), logger, ClientProxy.of(client()), new DynamicIndexName.Parser());
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
@ -253,7 +253,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.source(searchSource()
.query(filteredQuery(matchQuery("event_type", "a"), rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null));
TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(randomInt(10)) : null;
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, timeout));
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
@ -261,6 +262,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
SearchInput searchInput = factory.parseInput("_id", parser);
assertEquals(SearchInput.TYPE, searchInput.type());
assertThat(searchInput.getTimeout(), equalTo(timeout != null ? timeout : TimeValue.timeValueSeconds(30))); // 30s is the default
}
@Test
@ -272,7 +274,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.source(searchSource()
.query(boolQuery().must(matchQuery("event_type", "a")).filter(rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null));
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, null));
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
@ -308,7 +310,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
.source(searchSource()
.query(filteredQuery(matchQuery("event_type", "a"), rangeQuery("_timestamp").from("{{ctx.trigger.scheduled_time}}||-30s").to("{{ctx.trigger.triggered_time}}"))));
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null));
XContentBuilder builder = jsonBuilder().value(new SearchInput(request, null, null));
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();

View File

@ -223,7 +223,7 @@ public final class WatcherTestUtils {
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger),
new ExecutableScriptCondition(new ScriptCondition(Script.inline("return true").build()), logger, scriptService),
new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, client, new DynamicIndexName.Parser()),
new ExecutableSearchTransform(new SearchTransform(transformRequest, null), logger, client, new DynamicIndexName.Parser()),
new TimeValue(0),
new ExecutableActions(actions),
metadata,

View File

@ -14,6 +14,7 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -245,6 +246,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
SearchType searchType = getRandomSupportedSearchType();
String templateName = randomBoolean() ? null : "template1";
XContentBuilder builder = jsonBuilder().startObject();
builder.startObject("request");
if (indices != null) {
builder.array("indices", indices);
}
@ -272,6 +274,11 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.endObject();
builder.endObject();
TimeValue readTimeout = randomBoolean() ? TimeValue.timeValueSeconds(randomInt(10)) : null;
if (readTimeout != null) {
builder.field("timeout", readTimeout);
}
builder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
@ -292,12 +299,14 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
assertThat(executable.transform().getRequest().templateSource().toUtf8(), equalTo("{\"file\":\"template1\"}"));
}
assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes()));
assertThat(executable.transform().getTimeout(), equalTo(readTimeout != null ? readTimeout : TimeValue.timeValueSeconds(30))); // 30s is the default
}
@Test
public void testParser_WithIndexNames() throws Exception {
SearchType searchType = getRandomSupportedSearchType();
XContentBuilder builder = jsonBuilder().startObject();
builder.startObject("request");
builder.array("indices", "idx", "<idx-{now/d-3d}>");
if (searchType != null) {
builder.field("search_type", searchType.name());
@ -311,6 +320,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
.endObject();
builder.endObject();
builder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();

View File

@ -133,7 +133,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
}
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, never()).search(any(SearchRequest.class));
verify(clientProxy, never()).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, never()).clearScroll(anyString());
}
@ -160,7 +160,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
SearchResponse searchResponse = mockSearchResponse(1, 0, 0);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse);
when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
@ -173,7 +173,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
}
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}
@ -200,7 +200,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
SearchResponse searchResponse = mockSearchResponse(1, 1, 0);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse);
when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
@ -211,7 +211,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
assertThat(watchStore.watches().size(), equalTo(0));
verify(templateUtils, times(1)).putTemplate("watches", null);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}
@ -238,7 +238,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
SearchResponse searchResponse1 = mockSearchResponse(1, 1, 2);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse1);
when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse1);
BytesReference source = new BytesArray("{}");
InternalSearchHit hit1 = new InternalSearchHit(0, "_id1", new StringText("type"), Collections.<String, SearchHitField>emptyMap());
@ -266,7 +266,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
assertThat(watchStore.watches().size(), equalTo(2));
verify(templateUtils, times(1)).putTemplate("watches", null);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}

View File

@ -358,17 +358,18 @@ public class WatchTests extends ElasticsearchTestCase {
private ExecutableTransform randomTransform() {
String type = randomFrom(ScriptTransform.TYPE, SearchTransform.TYPE, ChainTransform.TYPE);
TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(5) : null;
switch (type) {
case ScriptTransform.TYPE:
return new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService);
case SearchTransform.TYPE:
return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client, indexNameParser);
return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout), logger, client, indexNameParser);
default: // chain
ChainTransform chainTransform = new ChainTransform(ImmutableList.of(
new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)),
new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout),
new ScriptTransform(Script.inline("_script").build())));
return new ExecutableChainTransform(chainTransform, logger, ImmutableList.<ExecutableTransform>of(
new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, client, indexNameParser),
new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS), timeout), logger, client, indexNameParser),
new ExecutableScriptTransform(new ScriptTransform(Script.inline("_script").build()), logger, scriptService)));
}
}
@ -392,7 +393,7 @@ public class WatchTests extends ElasticsearchTestCase {
list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), randomThrottler(), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer)));
}
if (randomBoolean()) {
IndexAction action = new IndexAction("_index", "_type", null);
IndexAction action = new IndexAction("_index", "_type", null, null);
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomTransform(), new ExecutableIndexAction(action, logger, client, indexNameParser)));
}
if (randomBoolean()) {