From 4c3300bf5791d3525e88afb490f425d300e065d3 Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Wed, 4 Nov 2020 10:15:46 -0700 Subject: [PATCH] Fix job scheduling for same scheduled time (#64598) The SchedulerEngine used by SLM uses a custom runnable that will schedule itself for its next execution if there is one to run. For the majority of jobs, this scheduling could be many hours or days away. Due to the scheduling so far in advance, there is a chance that time drifts on the machine or even that time varies core to core so there is no guarantee that the job actually runs on or after the scheduled time. This can cause some jobs to reschedule themselves for the same scheduled time even if they ran only a millisecond prior to the scheduled time, which causes unexpected actions to be taken such as what appears as duplicated snapshots. This change resolves this by checking the triggered time against the scheduled time and using the appropriate value to ensure that we do not have unexpected job runs. Relates #63754 Backport of #64501 --- .../xpack/core/scheduler/SchedulerEngine.java | 35 ++++++++++++++++--- .../core/scheduler/SchedulerEngineTests.java | 23 ++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index 06b03636d05..ecf073cf347 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -188,6 +188,11 @@ public class SchedulerEngine { } } + // for testing + ActiveSchedule getSchedule(String jobId) { + return schedules.get(jobId); + } + class ActiveSchedule implements Runnable { private final String name; @@ -195,7 +200,7 @@ public class SchedulerEngine { private final long startTime; private ScheduledFuture future; - private long scheduledTime; + private long scheduledTime = -1; ActiveSchedule(String name, Schedule schedule, long startTime) { this.name = name; @@ -223,10 +228,10 @@ public class SchedulerEngine { scheduleNextRun(triggeredTime); } - private void scheduleNextRun(long currentTime) { - this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, currentTime); + private void scheduleNextRun(long triggeredTime) { + this.scheduledTime = computeNextScheduledTime(triggeredTime); if (scheduledTime != -1) { - long delay = Math.max(0, scheduledTime - currentTime); + long delay = Math.max(0, scheduledTime - clock.millis()); try { synchronized (this) { if (future == null || future.isCancelled() == false) { @@ -242,6 +247,28 @@ public class SchedulerEngine { } } + // for testing + long getScheduledTime() { + return scheduledTime; + } + + long computeNextScheduledTime(long triggeredTime) { + // multiple time sources + multiple cpus + ntp + VMs means you can't trust time ever! + // scheduling happens far enough in advance in most cases that time can drift and we + // may execute at some point before the scheduled time. There can also be time differences + // between the CPU cores and/or the clock used by the threadpool and that used by this class + // for scheduling. Regardless, we shouldn't reschedule to execute again until after the + // scheduled time. + final long scheduleAfterTime; + if (scheduledTime != -1 && triggeredTime < scheduledTime) { + scheduleAfterTime = scheduledTime; + } else { + scheduleAfterTime = triggeredTime; + } + + return schedule.nextScheduledTimeAfter(startTime, scheduleAfterTime); + } + public synchronized void cancel() { FutureUtils.cancel(future); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java index 63814773d9c..4469210ffe6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java @@ -11,10 +11,12 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.ActiveSchedule; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Job; import org.mockito.ArgumentCaptor; import java.time.Clock; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -184,6 +186,27 @@ public class SchedulerEngineTests extends ESTestCase { } } + public void testNextScheduledTimeAfterCurrentScheduledTime() throws Exception { + final Clock clock = Clock.fixed(Clock.systemUTC().instant(), ZoneId.of("UTC")); + final long oneHourMillis = TimeUnit.HOURS.toMillis(1L); + final String jobId = randomAlphaOfLength(4); + final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, clock); + try { + engine.add(new Job(jobId, ((startTime, now) -> now + oneHourMillis))); + + ActiveSchedule activeSchedule = engine.getSchedule(jobId); + assertNotNull(activeSchedule); + assertEquals(clock.millis() + oneHourMillis, activeSchedule.getScheduledTime()); + + assertEquals(clock.millis() + oneHourMillis + oneHourMillis, + activeSchedule.computeNextScheduledTime(clock.millis() - randomIntBetween(1, 999))); + assertEquals(clock.millis() + oneHourMillis + oneHourMillis, + activeSchedule.computeNextScheduledTime(clock.millis() + TimeUnit.SECONDS.toMillis(10L))); + } finally { + engine.stop(); + } + } + private void assertFailedListenerLogMessage(Logger mockLogger, int times) { final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class); final ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class);