Add DateMath support to ScheduledTriggerEvent

This change changes how the ScheduledTriggerEvent is parsed to parse using DateMath instead of just as a date. This will allow the execute API to use such constructs as
```
POST _watcher/watch/test_watch/_execute
{
  "ignore_throttle" : true,
  "trigger_event" : {
    "schedule" : {
        "triggered_time": "now-5h",
        "scheduled_time": "now"
    }
  }
}
```

Fixes: elastic/elasticsearch#374

Original commit: elastic/x-pack-elasticsearch@fa286b217e
This commit is contained in:
Brian Murphy 2015-05-07 09:57:30 -04:00
parent 6f2429afb4
commit f78bc8dcb2
9 changed files with 114 additions and 24 deletions

View File

@ -66,9 +66,6 @@ public class RestExecuteWatchAction extends WatcherRestHandler {
ExecuteWatchRequestBuilder executeWatchRequestBuilder = client.prepareExecuteWatch(watchId); ExecuteWatchRequestBuilder executeWatchRequestBuilder = client.prepareExecuteWatch(watchId);
TriggerEvent triggerEvent = null; TriggerEvent triggerEvent = null;
if (request.content() != null && request.content().length() != 0) { if (request.content() != null && request.content().length() != 0) {
XContentParser parser = XContentHelper.createParser(request.content()); XContentParser parser = XContentHelper.createParser(request.content());
parser.nextToken(); parser.nextToken();

View File

@ -5,8 +5,10 @@
*/ */
package org.elasticsearch.watcher.support; package org.elasticsearch.watcher.support;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.joda.FormatDateTimeFormatter; import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone; import org.elasticsearch.common.joda.time.DateTimeZone;
@ -14,15 +16,18 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.core.DateFieldMapper; import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.clock.Clock;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/** /**
* *
*/ */
public class WatcherDateUtils { public class WatcherDateUtils {
public static final FormatDateTimeFormatter dateTimeFormatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER; public static final FormatDateTimeFormatter dateTimeFormatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER;
public static final DateMathParser dateMathParser = new DateMathParser(dateTimeFormatter, TimeUnit.SECONDS);
private WatcherDateUtils() { private WatcherDateUtils() {
} }
@ -40,6 +45,35 @@ public class WatcherDateUtils {
return dateTimeFormatter.printer().print(date); return dateTimeFormatter.printer().print(date);
} }
public static DateTime parseDateMath(String fieldName, XContentParser parser, DateTimeZone timeZone, Clock clock) throws IOException {
if (parser.currentToken() == XContentParser.Token.VALUE_NULL) {
throw new ParseException("could not parse date/time expected date field [{}] to not be null but was null", fieldName);
}
return parseDateMathOrNull(fieldName, parser, timeZone, clock);
}
public static DateTime parseDateMathOrNull(String fieldName, XContentParser parser, DateTimeZone timeZone, Clock clock) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.VALUE_NUMBER) {
return new DateTime(parser.longValue(), timeZone);
}
if (token == XContentParser.Token.VALUE_STRING) {
try {
return parseDateMath(parser.text(), timeZone, clock);
} catch (ElasticsearchParseException epe) {
throw new ParseException("could not parse date/time. expected date field [{}] to be either a number or a DateMath string but found [{}] instead", epe, fieldName, parser.text());
}
}
if (token == XContentParser.Token.VALUE_NULL) {
return null;
}
throw new ParseException("could not parse date/time. expected date field [{}] to be either a number or a string but found [{}] instead", fieldName, token);
}
public static DateTime parseDateMath(String valueString, DateTimeZone timeZone, final Clock clock) {
return new DateTime(dateMathParser.parse(valueString, new ClockNowCallable(clock)), timeZone);
}
public static DateTime parseDate(String fieldName, XContentParser parser, DateTimeZone timeZone) throws IOException { public static DateTime parseDate(String fieldName, XContentParser parser, DateTimeZone timeZone) throws IOException {
XContentParser.Token token = parser.currentToken(); XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.VALUE_NUMBER) { if (token == XContentParser.Token.VALUE_NUMBER) {
@ -80,8 +114,25 @@ public class WatcherDateUtils {
} }
public static class ParseException extends WatcherException { public static class ParseException extends WatcherException {
public ParseException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
public ParseException(String msg, Object... args) { public ParseException(String msg, Object... args) {
super(msg, args); super(msg, args);
} }
} }
private static class ClockNowCallable implements Callable<Long> {
private final Clock clock;
ClockNowCallable(Clock clock){
this.clock = clock;
}
@Override
public Long call() throws Exception {
return clock.millis();
}
}
} }

View File

@ -7,6 +7,7 @@ package org.elasticsearch.watcher.trigger.schedule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.AbstractTriggerEngine; import org.elasticsearch.watcher.trigger.AbstractTriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.trigger.TriggerService;
@ -20,10 +21,12 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine<Schedu
public static final String TYPE = ScheduleTrigger.TYPE; public static final String TYPE = ScheduleTrigger.TYPE;
protected final ScheduleRegistry scheduleRegistry; protected final ScheduleRegistry scheduleRegistry;
protected final Clock clock;
public ScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry) { public ScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings); super(settings);
this.scheduleRegistry = scheduleRegistry; this.scheduleRegistry = scheduleRegistry;
this.clock = clock;
} }
@Override @Override
@ -39,6 +42,6 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine<Schedu
@Override @Override
public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException { public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException {
return ScheduleTriggerEvent.parse(watchId, context, parser); return ScheduleTriggerEvent.parse(parser, watchId, context, clock);
} }
} }

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.WatcherDateUtils; import org.elasticsearch.watcher.support.WatcherDateUtils;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.TriggerEvent;
import java.io.IOException; import java.io.IOException;
@ -51,7 +52,7 @@ public class ScheduleTriggerEvent extends TriggerEvent {
return builder.endObject(); return builder.endObject();
} }
public static ScheduleTriggerEvent parse(String watchId, String context, XContentParser parser) throws IOException { public static ScheduleTriggerEvent parse(XContentParser parser, String watchId, String context, Clock clock) throws IOException {
DateTime triggeredTime = null; DateTime triggeredTime = null;
DateTime scheduledTime = null; DateTime scheduledTime = null;
@ -62,13 +63,14 @@ public class ScheduleTriggerEvent extends TriggerEvent {
currentFieldName = parser.currentName(); currentFieldName = parser.currentName();
} else if (Field.TRIGGERED_TIME.match(currentFieldName)) { } else if (Field.TRIGGERED_TIME.match(currentFieldName)) {
try { try {
triggeredTime = WatcherDateUtils.parseDate(currentFieldName, parser, UTC); triggeredTime = WatcherDateUtils.parseDateMath(currentFieldName, parser, UTC, clock);
} catch (WatcherDateUtils.ParseException pe) { } catch (WatcherDateUtils.ParseException pe) {
//Failed to parse as a date try datemath parsing
throw new ParseException("could not parse [{}] trigger event for [{}] for watch [{}]. failed to parse date field [{}]", pe, ScheduleTriggerEngine.TYPE, context, watchId, currentFieldName); throw new ParseException("could not parse [{}] trigger event for [{}] for watch [{}]. failed to parse date field [{}]", pe, ScheduleTriggerEngine.TYPE, context, watchId, currentFieldName);
} }
} else if (Field.SCHEDULED_TIME.match(currentFieldName)) { } else if (Field.SCHEDULED_TIME.match(currentFieldName)) {
try { try {
scheduledTime = WatcherDateUtils.parseDate(currentFieldName, parser, UTC); scheduledTime = WatcherDateUtils.parseDateMath(currentFieldName, parser, UTC, clock);
} catch (WatcherDateUtils.ParseException pe) { } catch (WatcherDateUtils.ParseException pe) {
throw new ParseException("could not parse [{}] trigger event for [{}] for watch [{}]. failed to parse date field [{}]", pe, ScheduleTriggerEngine.TYPE, context, watchId, currentFieldName); throw new ParseException("could not parse [{}] trigger event for [{}] for watch [{}]. failed to parse date field [{}]", pe, ScheduleTriggerEngine.TYPE, context, watchId, currentFieldName);
} }

View File

@ -29,14 +29,12 @@ import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
*/ */
public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
private final Clock clock;
private volatile Schedules schedules; private volatile Schedules schedules;
private ScheduledExecutorService scheduler; private ScheduledExecutorService scheduler;
@Inject @Inject
public SchedulerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { public SchedulerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry); super(settings, scheduleRegistry, clock);
this.clock = clock;
} }
@Override @Override

View File

@ -27,18 +27,15 @@ import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
*/ */
public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
private final Clock clock;
private final TimeValue tickInterval; private final TimeValue tickInterval;
private volatile Map<String, ActiveSchedule> schedules; private volatile Map<String, ActiveSchedule> schedules;
private Ticker ticker; private Ticker ticker;
@Inject @Inject
public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry); super(settings, scheduleRegistry, clock);
this.tickInterval = settings.getAsTime("watcher.trigger.schedule.ticker.tick_interval", TimeValue.timeValueMillis(500)); this.tickInterval = settings.getAsTime("watcher.trigger.schedule.ticker.tick_interval", TimeValue.timeValueMillis(500));
this.schedules = new ConcurrentHashMap<>(); this.schedules = new ConcurrentHashMap<>();
this.clock = clock;
} }
@Override @Override

View File

@ -34,13 +34,12 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
private final ESLogger logger; private final ESLogger logger;
private final ConcurrentMap<String, Job> jobs = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Job> jobs = new ConcurrentHashMap<>();
private final Clock clock;
@Inject @Inject
public ScheduleTriggerEngineMock(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) { public ScheduleTriggerEngineMock(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry); super(settings, scheduleRegistry, clock);
this.logger = Loggers.getLogger(ScheduleTriggerEngineMock.class, settings); this.logger = Loggers.getLogger(ScheduleTriggerEngineMock.class, settings);
this.clock = clock;
} }
@Override @Override
@ -50,7 +49,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
@Override @Override
public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException { public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String watchId, String context, XContentParser parser) throws IOException {
return ScheduleTriggerEvent.parse(watchId, context, parser); return ScheduleTriggerEvent.parse(parser, watchId, context, clock);
} }
@Override @Override

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.trigger.schedule;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
/**
*/
public class ScheduleTriggerEventTests extends ElasticsearchTestCase {
@Test
@Repeat(iterations = 10)
public void testParser_RandomDateMath() throws Exception {
String triggeredTime = randomFrom("now", "now+5m", "2015-05-07T22:24:41.254Z", "2015-05-07T22:24:41.254Z||-5m");
String scheduledTime = randomFrom("now", "now-5m", "2015-05-07T22:24:41.254Z", "2015-05-07T22:24:41.254Z||+5h");
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field(ScheduleTriggerEvent.Field.SCHEDULED_TIME.getPreferredName(), scheduledTime);
jsonBuilder.field(ScheduleTriggerEvent.Field.TRIGGERED_TIME.getPreferredName(), triggeredTime);
jsonBuilder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(jsonBuilder.bytes());
parser.nextToken();
ScheduleTriggerEvent scheduleTriggerEvent = ScheduleTriggerEvent.parse(parser, "_id", "_context", SystemClock.INSTANCE);
assertThat(scheduleTriggerEvent.scheduledTime().isAfter(0), is(true));
assertThat(scheduleTriggerEvent.triggeredTime().isAfter(0), is(true));
}
}

View File

@ -57,6 +57,7 @@ import org.elasticsearch.watcher.input.simple.SimpleInputFactory;
import org.elasticsearch.watcher.license.LicenseService; import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.WatcherUtils; import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.clock.SystemClock; import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.http.HttpClient; import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpMethod; import org.elasticsearch.watcher.support.http.HttpMethod;
@ -132,7 +133,7 @@ public class WatchTests extends ElasticsearchTestCase {
Schedule schedule = randomSchedule(); Schedule schedule = randomSchedule();
Trigger trigger = new ScheduleTrigger(schedule); Trigger trigger = new ScheduleTrigger(schedule);
ScheduleRegistry scheduleRegistry = registry(schedule); ScheduleRegistry scheduleRegistry = registry(schedule);
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(ImmutableSettings.EMPTY, scheduleRegistry); TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(ImmutableSettings.EMPTY, scheduleRegistry, SystemClock.INSTANCE);
TriggerService triggerService = new TriggerService(ImmutableSettings.EMPTY, ImmutableSet.of(triggerEngine)); TriggerService triggerService = new TriggerService(ImmutableSettings.EMPTY, ImmutableSet.of(triggerEngine));
SecretService secretService = new SecretService.PlainText(); SecretService secretService = new SecretService.PlainText();
@ -178,7 +179,7 @@ public class WatchTests extends ElasticsearchTestCase {
@Test @Test
public void testParser_BadActions() throws Exception { public void testParser_BadActions() throws Exception {
ScheduleRegistry scheduleRegistry = registry(randomSchedule()); ScheduleRegistry scheduleRegistry = registry(randomSchedule());
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(ImmutableSettings.EMPTY, scheduleRegistry); TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(ImmutableSettings.EMPTY, scheduleRegistry, SystemClock.INSTANCE);
TriggerService triggerService = new TriggerService(ImmutableSettings.EMPTY, ImmutableSet.of(triggerEngine)); TriggerService triggerService = new TriggerService(ImmutableSettings.EMPTY, ImmutableSet.of(triggerEngine));
SecretService secretService = new SecretService.PlainText(); SecretService secretService = new SecretService.PlainText();
ExecutableCondition condition = randomCondition(); ExecutableCondition condition = randomCondition();
@ -374,8 +375,8 @@ public class WatchTests extends ElasticsearchTestCase {
static class ParseOnlyScheduleTriggerEngine extends ScheduleTriggerEngine { static class ParseOnlyScheduleTriggerEngine extends ScheduleTriggerEngine {
public ParseOnlyScheduleTriggerEngine(Settings settings, ScheduleRegistry registry) { public ParseOnlyScheduleTriggerEngine(Settings settings, ScheduleRegistry registry, Clock clock) {
super(settings, registry); super(settings, registry, clock);
} }
@Override @Override