Watcher: Only trigger a watch if new or schedule/changed (#35908)
The trigger engine did always create a new schedule data structure, when the watcher indexing listener called an add. However the indexing listener also called add, when the watch status was updated. This means, that upon a watch status update the watch got retriggered, potentially waiting a defined interval from the watch status update onwards, instead of waiting from the last run. This commit only updates the schedule in the trigger engine, if it actually has changed, otherwise the existing schedule will not be touched. This has two results 1. If a watch is updated by an execution, the existing interval will not be touched (meaning the scheduled time will not move forward). 2. If a watch is updated by a user, but the schedule is not changed, it will not be reset from the update (for example starting to count from 5 minutes again, if the interval was set to 5 minutes). Furthermore some minor cleanups were applied, making variables final in the ctor, preventing double creation of variables.
This commit is contained in:
parent
38725bd3a1
commit
a615ab454a
|
@ -11,6 +11,13 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This interface is used to implement watcher specific schedules, the existing implementations are either
|
||||
* based on a cron based or an interval based schedule
|
||||
*
|
||||
* In addition to the methods defined here, you also have to implement the equals() method to properly work
|
||||
* for the trigger engine implementations.
|
||||
*/
|
||||
public interface Schedule extends SchedulerEngine.Schedule, ToXContent {
|
||||
|
||||
String type();
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.joda.time.DateTime;
|
|||
import java.time.Clock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -37,24 +38,23 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|||
positiveTimeSetting("xpack.watcher.trigger.schedule.ticker.tick_interval", TimeValue.timeValueMillis(500), Property.NodeScope);
|
||||
|
||||
private final TimeValue tickInterval;
|
||||
private volatile Map<String, ActiveSchedule> schedules;
|
||||
private Ticker ticker;
|
||||
private final Map<String, ActiveSchedule> schedules = new ConcurrentHashMap<>();
|
||||
private final Ticker ticker;
|
||||
|
||||
public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
|
||||
super(scheduleRegistry, clock);
|
||||
this.tickInterval = TICKER_INTERVAL_SETTING.get(settings);
|
||||
this.schedules = new ConcurrentHashMap<>();
|
||||
this.ticker = new Ticker(Node.NODE_DATA_SETTING.get(settings));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start(Collection<Watch> jobs) {
|
||||
long startTime = clock.millis();
|
||||
Map<String, ActiveSchedule> schedules = new HashMap<>(jobs.size());
|
||||
Map<String, ActiveSchedule> startingSchedules = new HashMap<>(jobs.size());
|
||||
for (Watch job : jobs) {
|
||||
if (job.trigger() instanceof ScheduleTrigger) {
|
||||
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
|
||||
schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
|
||||
startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
|
||||
}
|
||||
}
|
||||
// why are we calling putAll() here instead of assigning a brand
|
||||
|
@ -66,7 +66,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|||
// this method. The index operation however will run at the same time
|
||||
// as the reload, so if we clean out the old data structure here,
|
||||
// that can lead to that one watch not being triggered
|
||||
this.schedules.putAll(schedules);
|
||||
this.schedules.putAll(startingSchedules);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -84,7 +84,14 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|||
public void add(Watch watch) {
|
||||
assert watch.trigger() instanceof ScheduleTrigger;
|
||||
ScheduleTrigger trigger = (ScheduleTrigger) watch.trigger();
|
||||
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
|
||||
ActiveSchedule currentSchedule = schedules.get(watch.id());
|
||||
// only update the schedules data structure if the scheduled trigger really has changed, otherwise the time would be reset again
|
||||
// resulting in later executions, as the time would only count after a watch has been stored, as this code is triggered by the
|
||||
// watcher indexing listener
|
||||
// this also means that updating an existing watch would not retrigger the schedule time, if it remains the same schedule
|
||||
if (currentSchedule == null || currentSchedule.schedule.equals(trigger.getSchedule()) == false) {
|
||||
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -98,10 +105,10 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|||
for (ActiveSchedule schedule : schedules.values()) {
|
||||
long scheduledTime = schedule.check(triggeredTime);
|
||||
if (scheduledTime > 0) {
|
||||
logger.debug("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name,
|
||||
new DateTime(triggeredTime, UTC), new DateTime(scheduledTime, UTC));
|
||||
events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime, UTC),
|
||||
new DateTime(scheduledTime, UTC)));
|
||||
DateTime triggeredDateTime = new DateTime(triggeredTime, UTC);
|
||||
DateTime scheduledDateTime = new DateTime(scheduledTime, UTC);
|
||||
logger.debug("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name, triggeredDateTime, scheduledDateTime);
|
||||
events.add(new ScheduleTriggerEvent(schedule.name, triggeredDateTime, scheduledDateTime));
|
||||
if (events.size() >= 1000) {
|
||||
notifyListeners(events);
|
||||
events.clear();
|
||||
|
@ -113,6 +120,11 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|||
}
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
Map<String, ActiveSchedule> getSchedules() {
|
||||
return Collections.unmodifiableMap(schedules);
|
||||
}
|
||||
|
||||
protected void notifyListeners(List<TriggerEvent> events) {
|
||||
consumers.forEach(consumer -> consumer.accept(events));
|
||||
}
|
||||
|
|
|
@ -13,7 +13,6 @@ import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
|
|||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
|
||||
import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
|
||||
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
|
||||
|
@ -36,12 +35,13 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily;
|
|||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.joda.time.DateTimeZone.UTC;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TickerScheduleEngineTests extends ESTestCase {
|
||||
|
||||
private TriggerEngine engine;
|
||||
private TickerScheduleTriggerEngine engine;
|
||||
protected ClockMock clock = ClockMock.frozen();
|
||||
|
||||
@Before
|
||||
|
@ -49,7 +49,7 @@ public class TickerScheduleEngineTests extends ESTestCase {
|
|||
engine = createEngine();
|
||||
}
|
||||
|
||||
private TriggerEngine createEngine() {
|
||||
private TickerScheduleTriggerEngine createEngine() {
|
||||
Settings settings = Settings.EMPTY;
|
||||
// having a low value here speeds up the tests tremendously, we still want to run with the defaults every now and then
|
||||
if (usually()) {
|
||||
|
@ -254,6 +254,22 @@ public class TickerScheduleEngineTests extends ESTestCase {
|
|||
assertThat(counter.get(), is(2));
|
||||
}
|
||||
|
||||
public void testAddOnlyWithNewSchedule() {
|
||||
engine.start(Collections.emptySet());
|
||||
|
||||
// add watch with schedule
|
||||
Watch oncePerSecondWatch = createWatch("_id", interval("1s"));
|
||||
engine.add(oncePerSecondWatch);
|
||||
TickerScheduleTriggerEngine.ActiveSchedule activeSchedule = engine.getSchedules().get("_id");
|
||||
engine.add(oncePerSecondWatch);
|
||||
assertThat(engine.getSchedules().get("_id"), is(activeSchedule));
|
||||
|
||||
// add watch with same id but different watch
|
||||
Watch oncePerMinuteWatch = createWatch("_id", interval("1m"));
|
||||
engine.add(oncePerMinuteWatch);
|
||||
assertThat(engine.getSchedules().get("_id"), not(is(activeSchedule)));
|
||||
}
|
||||
|
||||
private Watch createWatch(String name, Schedule schedule) {
|
||||
return new Watch(name, new ScheduleTrigger(schedule), new ExecutableNoneInput(),
|
||||
InternalAlwaysCondition.INSTANCE, null, null,
|
||||
|
|
Loading…
Reference in New Issue