Fixes a bug that prevents the actual job in the schedule engine not being adjusted if the watch's interval is being updated with the same interval unit.
Closes elastic/elasticsearch#388 Original commit: elastic/x-pack-elasticsearch@ecf991f42c
This commit is contained in:
parent
cf23d54aed
commit
e1beb6e9b3
|
@ -174,7 +174,7 @@ public class IntervalSchedule implements Schedule {
|
|||
|
||||
Interval interval = (Interval) o;
|
||||
|
||||
if (unit.millis(duration) != interval.unit.millis(duration)) return false;
|
||||
if (unit.millis(duration) != interval.unit.millis(interval.duration)) return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -167,7 +167,13 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|||
ActiveSchedule[] newSchedules = new ActiveSchedule[schedules.length];
|
||||
ImmutableMap.Builder<String, ActiveSchedule> builder = ImmutableMap.builder();
|
||||
for (int i = 0; i < schedules.length; i++) {
|
||||
ActiveSchedule sched = schedules[i].name.equals(schedule.name) ? schedule : schedules[i];
|
||||
final ActiveSchedule sched;
|
||||
if (schedules[i].name.equals(schedule.name)) {
|
||||
sched = schedule;
|
||||
schedules[i].cancel();
|
||||
} else {
|
||||
sched = schedules[i];
|
||||
}
|
||||
newSchedules[i] = sched;
|
||||
builder.put(sched.name, sched);
|
||||
}
|
||||
|
|
|
@ -5,10 +5,12 @@
|
|||
*/
|
||||
package org.elasticsearch.watcher.test.integration;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
@ -248,6 +250,42 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
|
|||
assertThat(count, equalTo(findNumberOfPerformedActions("_name")));
|
||||
}
|
||||
|
||||
@Test
|
||||
@LuceneTestCase.Slow
|
||||
public void testModifyWatchWithSameUnit() throws Exception {
|
||||
if (timeWarped()) {
|
||||
logger.info("Skipping testModifyWatches_ because timewarp is enabled");
|
||||
return;
|
||||
}
|
||||
|
||||
WatchSourceBuilder source = watchBuilder()
|
||||
.trigger(schedule(interval("1s")))
|
||||
.input(simpleInput("key", "value"))
|
||||
.throttlePeriod(TimeValue.timeValueSeconds(0))
|
||||
.addAction("_id", loggingAction("hello {{ctx.watcher_id}}!"));
|
||||
watcherClient().preparePutWatch("_name")
|
||||
.setSource(source)
|
||||
.get();
|
||||
|
||||
Thread.sleep(5000);
|
||||
assertWatchWithMinimumPerformedActionsCount("_name", 5, false);
|
||||
|
||||
source = watchBuilder()
|
||||
.trigger(schedule(interval("100s")))
|
||||
.throttlePeriod(TimeValue.timeValueSeconds(0))
|
||||
.input(simpleInput("key", "value"))
|
||||
.addAction("_id", loggingAction("hello {{ctx.watcher_id}}!"));
|
||||
watcherClient().preparePutWatch("_name")
|
||||
.setSource(source)
|
||||
.get();
|
||||
|
||||
// Wait one second to be sure that the scheduler engine has executed any previous job instance of the watch
|
||||
Thread.sleep(1000);
|
||||
long before = historyRecordsCount("_name");
|
||||
Thread.sleep(5000);
|
||||
assertThat("Watch has been updated to 100s interval, so no new records should have been added.", historyRecordsCount("_name"), equalTo(before));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConditionSearchWithSource() throws Exception {
|
||||
String variable = randomFrom("ctx.execution_time", "ctx.trigger.scheduled_time", "ctx.trigger.triggered_time");
|
||||
|
|
Loading…
Reference in New Issue