Fix remaining fractional TimeValue issues with watcher

Removes the remaining spots where watcher makes fractional TimeValues.

Closes elastic/elasticsearch#3231

Original commit: elastic/x-pack-elasticsearch@22b0d37ed3
This commit is contained in:
Nik Everett 2016-09-02 10:06:12 -04:00
parent cd9add5350
commit 89ce4ebb08
30 changed files with 139 additions and 36 deletions

View File

@ -172,12 +172,59 @@ def generate_watcher_index(client, version):
"interval": "1s"
}
},
"input" : {
"search" : {
"timeout": "100s",
"request" : {
"indices" : [ ".watches" ],
"body" : {
"query" : { "match_all" : {}},
"size": 1
},
}
}
},
"condition" : {
"always" : {}
},
"throttle_period": "1s",
"actions" : {
"index_payload" : {
"transform" : {
"search" : {
"request" : {
"body" : { "size": 1, "query" : { "match_all" : {} }}
},
"timeout": "100s"
}
},
"index" : {
"index" : "bwc_watch_index",
"doc_type" : "bwc_watch_type",
"timeout": "100s"
}
}
}
}
response = requests.put('http://localhost:9200/_watcher/watch/bwc_watch', auth=('es_admin', '0123456789'), data=json.dumps(body))
logging.info('PUT watch response: ' + response.text)
if (response.status_code != 201) :
raise Exception('PUT http://localhost:9200/_watcher/watch/bwc_watch did not succeed!')
logging.info('Adding a watch with "fun" throttle periods')
body = {
"trigger" : {
"schedule": {
"interval": "1s"
}
},
"condition" : {
"never" : {}
},
"throttle_period": "100s",
"actions" : {
"index_payload" : {
"throttle_period": "100s",
"transform" : {
"search" : {
"request" : {
@ -192,10 +239,10 @@ def generate_watcher_index(client, version):
}
}
}
response = requests.put('http://localhost:9200/_watcher/watch/bwc_watch', auth=('es_admin', '0123456789'), data=json.dumps(body))
response = requests.put('http://localhost:9200/_watcher/watch/bwc_throttle_period', auth=('es_admin', '0123456789'), data=json.dumps(body))
logging.info('PUT watch response: ' + response.text)
if (response.status_code != 201) :
raise Exception('PUT http://localhost:9200/_watcher/watch/bwc_watch did not succeed!')
raise Exception('PUT http://localhost:9200/_watcher/watch/bwc_throttle_period did not succeed!')
if parse_version(version) < parse_version('2.3.0'):
logging.info('Skipping watch with a funny read timeout because email attachement is not supported by this version')

View File

@ -33,6 +33,8 @@ import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
*
*/
@ -180,7 +182,8 @@ public class ActionWrapper implements ToXContent {
builder.startObject();
TimeValue throttlePeriod = throttler.throttlePeriod();
if (throttlePeriod != null) {
builder.field(Throttler.Field.THROTTLE_PERIOD.getPreferredName(), throttlePeriod);
builder.timeValueField(Throttler.Field.THROTTLE_PERIOD.getPreferredName(),
Throttler.Field.THROTTLE_PERIOD_HUMAN.getPreferredName(), throttlePeriod);
}
if (condition != null) {
builder.startObject(Watch.Field.CONDITION.getPreferredName())
@ -218,8 +221,10 @@ public class ActionWrapper implements ToXContent {
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Transform.Field.TRANSFORM)) {
transform = transformRegistry.parse(watchId, parser);
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Throttler.Field.THROTTLE_PERIOD)) {
throttlePeriod = timeValueMillis(parser.longValue());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Throttler.Field.THROTTLE_PERIOD_HUMAN)) {
try {
throttlePeriod = WatcherDateTimeUtils.parseTimeValue(parser, Throttler.Field.THROTTLE_PERIOD.toString());
throttlePeriod = WatcherDateTimeUtils.parseTimeValue(parser, Throttler.Field.THROTTLE_PERIOD_HUMAN.toString());
} catch (ElasticsearchParseException pe) {
throw new ElasticsearchParseException("could not parse action [{}/{}]. failed to parse field [{}] as time value",
pe, watchId, actionId, currentFieldName);

View File

@ -19,6 +19,8 @@ import org.joda.time.DateTimeZone;
import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
*
*/
@ -96,7 +98,7 @@ public class IndexAction implements Action {
builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField);
}
if (timeout != null) {
builder.field(Field.TIMEOUT.getPreferredName(), timeout);
builder.timeValueField(Field.TIMEOUT.getPreferredName(), Field.TIMEOUT_HUMAN.getPreferredName(), timeout);
}
if (dynamicNameTimeZone != null) {
builder.field(Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), dynamicNameTimeZone);
@ -123,13 +125,21 @@ public class IndexAction implements Action {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. failed to parse index name value for " +
"field [{}]", pe, TYPE, watchId, actionId, currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMEOUT)) {
timeout = timeValueMillis(parser.longValue());
} else {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. unexpected number field [{}]", TYPE,
watchId, actionId, currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.DOC_TYPE)) {
docType = parser.text();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.EXECUTION_TIME_FIELD)) {
executionTimeField = parser.text();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMEOUT)) {
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMEOUT_HUMAN)) {
// Parser for human specified timeouts and 2.x compatibility
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.DYNAMIC_NAME_TIMEZONE)) {
if (token == XContentParser.Token.VALUE_STRING) {
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
@ -266,7 +276,8 @@ public class IndexAction implements Action {
ParseField SOURCE = new ParseField("source");
ParseField RESPONSE = new ParseField("response");
ParseField REQUEST = new ParseField("request");
ParseField TIMEOUT = new ParseField("timeout");
ParseField TIMEOUT = new ParseField("timeout_in_millis");
ParseField TIMEOUT_HUMAN = new ParseField("timeout");
ParseField DYNAMIC_NAME_TIMEZONE = new ParseField("dynamic_name_timezone");
}
}

View File

@ -43,6 +43,7 @@ public interface Throttler {
}
interface Field {
ParseField THROTTLE_PERIOD = new ParseField("throttle_period");
ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
}
}

View File

@ -155,7 +155,8 @@ public class WatchSourceBuilder implements ToXContent {
}
if (defaultThrottlePeriod != null) {
builder.field(Watch.Field.THROTTLE_PERIOD.getPreferredName(), defaultThrottlePeriod.toString());
builder.timeValueField(Watch.Field.THROTTLE_PERIOD.getPreferredName(),
Watch.Field.THROTTLE_PERIOD_HUMAN.getPreferredName(), defaultThrottlePeriod);
}
builder.startObject(Watch.Field.ACTIONS.getPreferredName());
@ -203,7 +204,8 @@ public class WatchSourceBuilder implements ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (throttlePeriod != null) {
builder.field(Throttler.Field.THROTTLE_PERIOD.getPreferredName(), throttlePeriod);
builder.timeValueField(Throttler.Field.THROTTLE_PERIOD.getPreferredName(),
Throttler.Field.THROTTLE_PERIOD_HUMAN.getPreferredName(), throttlePeriod);
}
if (condition != null) {
builder.startObject(Watch.Field.CONDITION.getPreferredName())

View File

@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.Set;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
public class SearchInput implements Input {
@ -99,7 +100,7 @@ public class SearchInput implements Input {
builder.field(Field.EXTRACT.getPreferredName(), extractKeys);
}
if (timeout != null) {
builder.field(Field.TIMEOUT.getPreferredName(), timeout);
builder.timeValueField(Field.TIMEOUT.getPreferredName(), Field.TIMEOUT_HUMAN.getPreferredName(), timeout);
}
if (dynamicNameTimeZone != null) {
builder.field(Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), dynamicNameTimeZone);
@ -144,7 +145,10 @@ public class SearchInput implements Input {
watchId, currentFieldName);
}
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMEOUT)) {
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString());
timeout = timeValueMillis(parser.longValue());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMEOUT_HUMAN)) {
// Parser for human specified timeouts and 2.x compatibility
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.DYNAMIC_NAME_TIMEZONE)) {
if (token == XContentParser.Token.VALUE_STRING) {
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
@ -238,7 +242,8 @@ public class SearchInput implements Input {
public interface Field extends Input.Field {
ParseField REQUEST = new ParseField("request");
ParseField EXTRACT = new ParseField("extract");
ParseField TIMEOUT = new ParseField("timeout");
ParseField TIMEOUT = new ParseField("timeout_in_millis");
ParseField TIMEOUT_HUMAN = new ParseField("timeout");
ParseField DYNAMIC_NAME_TIMEZONE = new ParseField("dynamic_name_timezone");
}
}

View File

@ -23,6 +23,8 @@ import org.joda.time.DateTimeZone;
import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
public class SearchTransform implements Transform {
public static final String TYPE = "search";
@ -81,7 +83,7 @@ public class SearchTransform implements Transform {
builder.field(Field.REQUEST.getPreferredName(), request);
}
if (timeout != null) {
builder.field(Field.TIMEOUT.getPreferredName(), timeout);
builder.timeValueField(Field.TIMEOUT.getPreferredName(), Field.TIMEOUT_HUMAN.getPreferredName(), timeout);
}
if (dynamicNameTimeZone != null) {
builder.field(Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), dynamicNameTimeZone);
@ -110,7 +112,10 @@ public class SearchTransform implements Transform {
TYPE, watchId, currentFieldName);
}
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMEOUT)) {
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT.toString());
timeout = timeValueMillis(parser.longValue());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMEOUT_HUMAN)) {
// Parser for human specified timeouts and 2.x compatibility
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.DYNAMIC_NAME_TIMEZONE)) {
if (token == XContentParser.Token.VALUE_STRING) {
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
@ -192,7 +197,8 @@ public class SearchTransform implements Transform {
public interface Field extends Transform.Field {
ParseField REQUEST = new ParseField("request");
ParseField TIMEOUT = new ParseField("timeout");
ParseField TIMEOUT = new ParseField("timeout_in_millis");
ParseField TIMEOUT_HUMAN = new ParseField("timeout");
ParseField DYNAMIC_NAME_TIMEZONE = new ParseField("dynamic_name_timezone");
}
}

View File

@ -51,6 +51,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentHelper.createParser;
import static org.elasticsearch.xpack.watcher.support.Exceptions.ioException;
@ -183,11 +184,8 @@ public class Watch implements TriggerEngine.Job, ToXContent {
builder.field(Field.TRANSFORM.getPreferredName()).startObject().field(transform.type(), transform, params).endObject();
}
if (throttlePeriod != null) {
if (builder.humanReadable()) {
builder.field(Field.THROTTLE_PERIOD.getPreferredName(), throttlePeriod.format(PeriodType.seconds()));
} else {
builder.field(Field.THROTTLE_PERIOD.getPreferredName(), throttlePeriod);
}
builder.timeValueField(Field.THROTTLE_PERIOD.getPreferredName(),
Field.THROTTLE_PERIOD_HUMAN.getPreferredName(), throttlePeriod);
}
builder.field(Field.ACTIONS.getPreferredName(), actions, params);
if (metadata != null) {
@ -307,8 +305,11 @@ public class Watch implements TriggerEngine.Job, ToXContent {
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TRANSFORM)) {
transform = transformRegistry.parse(id, parser);
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.THROTTLE_PERIOD)) {
throttlePeriod = timeValueMillis(parser.longValue());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.THROTTLE_PERIOD_HUMAN)) {
// Parser for human specified and 2.x backwards compatible throttle period
try {
throttlePeriod = WatcherDateTimeUtils.parseTimeValue(parser, Field.THROTTLE_PERIOD.toString());
throttlePeriod = WatcherDateTimeUtils.parseTimeValue(parser, Field.THROTTLE_PERIOD_HUMAN.toString());
} catch (ElasticsearchParseException pe) {
throw new ElasticsearchParseException("could not parse watch [{}]. failed to parse time value for field [{}]",
pe, id, currentFieldName);
@ -360,7 +361,8 @@ public class Watch implements TriggerEngine.Job, ToXContent {
ParseField CONDITION = new ParseField("condition");
ParseField ACTIONS = new ParseField("actions");
ParseField TRANSFORM = new ParseField("transform");
ParseField THROTTLE_PERIOD = new ParseField("throttle_period");
ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
ParseField METADATA = new ParseField("metadata");
ParseField STATUS = new ParseField("_status");
}

View File

@ -37,6 +37,11 @@
"index" : false,
"doc_values" : false
},
"throttle_period_in_millis": {
"type" : "long",
"index" : false,
"doc_values" : false
},
"transform": {
"type" : "object",
"enabled" : false,

View File

@ -35,7 +35,6 @@ import static org.hamcrest.Matchers.not;
/**
* Tests for watcher indexes created before 5.0.
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/pull/3342")
public class OldWatcherIndicesBackwardsCompatibilityIT extends AbstractOldXPackIndicesBackwardsCompatibilityTestCase {
@Override
public Settings nodeSettings(int ord) {
@ -96,11 +95,31 @@ public class OldWatcherIndicesBackwardsCompatibilityIT extends AbstractOldXPackI
assertTrue(bwcWatch.isFound());
assertNotNull(bwcWatch.getSource());
Map<String, Object> source = bwcWatch.getSource().getAsMap();
assertEquals(1000, source.get("throttle_period_in_millis"));
Map<?, ?> input = (Map<?, ?>) source.get("input");
Map<?, ?> search = (Map<?, ?>) input.get("search");
assertEquals(96000, search.get("timeout_in_millis")); // We asked for 100s but 2.x converted that to 1.6m which is actually 96s...
Map<?, ?> actions = (Map<?, ?>) source.get("actions");
Map<?, ?> indexPayload = (Map<?, ?>) actions.get("index_payload");
Map<?, ?> transform = (Map<?, ?>) indexPayload.get("transform");
search = (Map<?, ?>) transform.get("search");
assertEquals(96000, search.get("timeout_in_millis")); // We asked for 100s but 2.x converted that to 1.6m which is actually 96s...
Map<?, ?> index = (Map<?, ?>) indexPayload.get("index");
assertEquals("bwc_watch_index", index.get("index"));
assertEquals("bwc_watch_type", index.get("doc_type"));
assertEquals(96000, index.get("timeout_in_millis")); // We asked for 100s but 2.x converted that to 1.6m which is actually 96s...
// Fetch a watch with "fun" throttle periods
bwcWatch = watcherClient.prepareGetWatch("bwc_throttle_period").get();
assertTrue(bwcWatch.isFound());
assertNotNull(bwcWatch.getSource());
source = bwcWatch.getSource().getAsMap();
// We asked for 100s but 2.x converted that to 1.6m which is actually 96s...
assertEquals(96000, source.get("throttle_period_in_millis"));
actions = (Map<?, ?>) source.get("actions");
indexPayload = (Map<?, ?>) actions.get("index_payload");
// We asked for 100s but 2.x converted that to 1.6m which is actually 96s...
assertEquals(96000, indexPayload.get("throttle_period_in_millis"));
if (version.onOrAfter(Version.V_2_3_0)) {
/* Fetch a watch with a funny timeout to verify loading fractional time values. This watch is only built in >= 2.3 because

View File

@ -18,7 +18,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
@ -49,8 +48,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.joda.time.DateTimeZone.UTC;
/**
*/
public class IndexActionTests extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
@ -191,7 +188,7 @@ public class IndexActionTests extends ESIntegTestCase {
}
TimeValue writeTimeout = randomBoolean() ? TimeValue.timeValueSeconds(randomInt(10)) : null;
if (writeTimeout != null) {
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout);
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis());
}
builder.endObject();
Client client = client();

View File

@ -128,6 +128,7 @@ import java.util.Map;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
@ -198,7 +199,7 @@ public class WatchTests extends ESTestCase {
}
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), unmodifiableMap(actionsStatuses));
TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10));
TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10000));
Watch watch = new Watch("_name", trigger, input, condition, transform, throttlePeriod, actions, metadata, watchStatus);
@ -335,7 +336,9 @@ public class WatchTests extends ESTestCase {
String type = randomFrom(SearchInput.TYPE, SimpleInput.TYPE);
switch (type) {
case SearchInput.TYPE:
SearchInput searchInput = searchInput(WatcherTestUtils.templateRequest(searchSource(), "idx")).build();
SearchInput searchInput = searchInput(WatcherTestUtils.templateRequest(searchSource(), "idx"))
.timeout(randomBoolean() ? null : timeValueSeconds(between(1, 10000)))
.build();
return new ExecutableSearchInput(searchInput, logger, client, searchTemplateService, null);
default:
SimpleInput simpleInput = InputBuilders.simpleInput(singletonMap("_key", "_val")).build();
@ -387,7 +390,7 @@ public class WatchTests extends ESTestCase {
private ExecutableTransform randomTransform() {
String type = randomFrom(ScriptTransform.TYPE, SearchTransform.TYPE, ChainTransform.TYPE);
TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(5) : null;
TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.UTC : null;
switch (type) {
case ScriptTransform.TYPE:
@ -432,7 +435,7 @@ public class WatchTests extends ESTestCase {
}
if (randomBoolean()) {
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.UTC : null;
TimeValue timeout = randomBoolean() ? TimeValue.timeValueSeconds(30) : null;
TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
IndexAction action = new IndexAction("_index", "_type", null, timeout, timeZone);
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(), randomCondition(), randomTransform(),
new ExecutableIndexAction(action, logger, client, null)));
@ -470,7 +473,7 @@ public class WatchTests extends ESTestCase {
}
private ActionThrottler randomThrottler() {
return new ActionThrottler(SystemClock.INSTANCE, randomBoolean() ? null : TimeValue.timeValueMinutes(randomIntBetween(3, 5)),
return new ActionThrottler(SystemClock.INSTANCE, randomBoolean() ? null : timeValueSeconds(randomIntBetween(1, 10000)),
licenseState);
}

View File

@ -53,4 +53,4 @@ teardown:
id: "my_watch1"
- match: { found : true}
- match: { _id: "my_watch1" }
- match: { watch.throttle_period: "10s" }
- match: { watch.throttle_period_in_millis: 10000 }

View File

@ -53,4 +53,4 @@ teardown:
id: "my_watch1"
- match: { found : true}
- match: { _id: "my_watch1" }
- match: { watch.actions.test_index.throttle_period: "10s" }
- match: { watch.actions.test_index.throttle_period_in_millis: 10000 }