diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java index 4fe3eeae689..95ba8bac2f2 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/IntervalSchedule.java @@ -174,7 +174,7 @@ public class IntervalSchedule implements Schedule { Interval interval = (Interval) o; - if (unit.millis(duration) != interval.unit.millis(duration)) return false; + if (unit.millis(duration) != interval.unit.millis(interval.duration)) return false; return true; } diff --git a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java index 64e50b3e2f6..29aab082a6a 100644 --- a/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java +++ b/src/main/java/org/elasticsearch/watcher/trigger/schedule/engine/SchedulerScheduleTriggerEngine.java @@ -167,7 +167,13 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine { ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length]; ImmutableMap.Builder builder = ImmutableMap.builder(); for (int i = 0; i < schedules.length; i++) { - ActiveSchedule sched = schedules[i].name.equals(schedule.name) ? schedule : schedules[i]; + final ActiveSchedule sched; + if (schedules[i].name.equals(schedule.name)) { + sched = schedule; + schedules[i].cancel(); + } else { + sched = schedules[i]; + } newSchedules[i] = sched; builder.put(sched.name, sched); } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java index c473b5ef3c8..76764334234 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java @@ -5,10 +5,12 @@ */ package org.elasticsearch.watcher.test.integration; +import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.script.ScriptService; @@ -248,6 +250,42 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests { assertThat(count, equalTo(findNumberOfPerformedActions("_name"))); } + @Test + @LuceneTestCase.Slow + public void testModifyWatchWithSameUnit() throws Exception { + if (timeWarped()) { + logger.info("Skipping testModifyWatches_ because timewarp is enabled"); + return; + } + + WatchSourceBuilder source = watchBuilder() + .trigger(schedule(interval("1s"))) + .input(simpleInput("key", "value")) + .throttlePeriod(TimeValue.timeValueSeconds(0)) + .addAction("_id", loggingAction("hello {{ctx.watcher_id}}!")); + watcherClient().preparePutWatch("_name") + .setSource(source) + .get(); + + Thread.sleep(5000); + assertWatchWithMinimumPerformedActionsCount("_name", 5, false); + + source = watchBuilder() + .trigger(schedule(interval("100s"))) + .throttlePeriod(TimeValue.timeValueSeconds(0)) + .input(simpleInput("key", "value")) + .addAction("_id", loggingAction("hello {{ctx.watcher_id}}!")); + watcherClient().preparePutWatch("_name") + .setSource(source) + .get(); + + // Wait one second to be sure that the scheduler engine has executed any previous job instance of the watch + Thread.sleep(1000); + long before = historyRecordsCount("_name"); + Thread.sleep(5000); + assertThat("Watch has been updated to 100s interval, so no new records should have been added.", historyRecordsCount("_name"), equalTo(before)); + } + @Test public void testConditionSearchWithSource() throws Exception { String variable = randomFrom("ctx.execution_time", "ctx.trigger.scheduled_time", "ctx.trigger.triggered_time");