Fix job scheduling for same scheduled time (#64598)
The SchedulerEngine used by SLM uses a custom runnable that will schedule itself for its next execution if there is one to run. For the majority of jobs, this scheduling could be many hours or days away. Due to the scheduling so far in advance, there is a chance that time drifts on the machine or even that time varies core to core so there is no guarantee that the job actually runs on or after the scheduled time. This can cause some jobs to reschedule themselves for the same scheduled time even if they ran only a millisecond prior to the scheduled time, which causes unexpected actions to be taken such as what appears as duplicated snapshots. This change resolves this by checking the triggered time against the scheduled time and using the appropriate value to ensure that we do not have unexpected job runs. Relates #63754 Backport of #64501
This commit is contained in:
parent
2dbc444fe5
commit
4c3300bf57
|
@ -188,6 +188,11 @@ public class SchedulerEngine {
|
|||
}
|
||||
}
|
||||
|
||||
// for testing
|
||||
ActiveSchedule getSchedule(String jobId) {
|
||||
return schedules.get(jobId);
|
||||
}
|
||||
|
||||
class ActiveSchedule implements Runnable {
|
||||
|
||||
private final String name;
|
||||
|
@ -195,7 +200,7 @@ public class SchedulerEngine {
|
|||
private final long startTime;
|
||||
|
||||
private ScheduledFuture<?> future;
|
||||
private long scheduledTime;
|
||||
private long scheduledTime = -1;
|
||||
|
||||
ActiveSchedule(String name, Schedule schedule, long startTime) {
|
||||
this.name = name;
|
||||
|
@ -223,10 +228,10 @@ public class SchedulerEngine {
|
|||
scheduleNextRun(triggeredTime);
|
||||
}
|
||||
|
||||
private void scheduleNextRun(long currentTime) {
|
||||
this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, currentTime);
|
||||
private void scheduleNextRun(long triggeredTime) {
|
||||
this.scheduledTime = computeNextScheduledTime(triggeredTime);
|
||||
if (scheduledTime != -1) {
|
||||
long delay = Math.max(0, scheduledTime - currentTime);
|
||||
long delay = Math.max(0, scheduledTime - clock.millis());
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (future == null || future.isCancelled() == false) {
|
||||
|
@ -242,6 +247,28 @@ public class SchedulerEngine {
|
|||
}
|
||||
}
|
||||
|
||||
// for testing
|
||||
long getScheduledTime() {
|
||||
return scheduledTime;
|
||||
}
|
||||
|
||||
long computeNextScheduledTime(long triggeredTime) {
|
||||
// multiple time sources + multiple cpus + ntp + VMs means you can't trust time ever!
|
||||
// scheduling happens far enough in advance in most cases that time can drift and we
|
||||
// may execute at some point before the scheduled time. There can also be time differences
|
||||
// between the CPU cores and/or the clock used by the threadpool and that used by this class
|
||||
// for scheduling. Regardless, we shouldn't reschedule to execute again until after the
|
||||
// scheduled time.
|
||||
final long scheduleAfterTime;
|
||||
if (scheduledTime != -1 && triggeredTime < scheduledTime) {
|
||||
scheduleAfterTime = scheduledTime;
|
||||
} else {
|
||||
scheduleAfterTime = triggeredTime;
|
||||
}
|
||||
|
||||
return schedule.nextScheduledTimeAfter(startTime, scheduleAfterTime);
|
||||
}
|
||||
|
||||
public synchronized void cancel() {
|
||||
FutureUtils.cancel(future);
|
||||
}
|
||||
|
|
|
@ -11,10 +11,12 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.ActiveSchedule;
|
||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Job;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -184,6 +186,27 @@ public class SchedulerEngineTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testNextScheduledTimeAfterCurrentScheduledTime() throws Exception {
|
||||
final Clock clock = Clock.fixed(Clock.systemUTC().instant(), ZoneId.of("UTC"));
|
||||
final long oneHourMillis = TimeUnit.HOURS.toMillis(1L);
|
||||
final String jobId = randomAlphaOfLength(4);
|
||||
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, clock);
|
||||
try {
|
||||
engine.add(new Job(jobId, ((startTime, now) -> now + oneHourMillis)));
|
||||
|
||||
ActiveSchedule activeSchedule = engine.getSchedule(jobId);
|
||||
assertNotNull(activeSchedule);
|
||||
assertEquals(clock.millis() + oneHourMillis, activeSchedule.getScheduledTime());
|
||||
|
||||
assertEquals(clock.millis() + oneHourMillis + oneHourMillis,
|
||||
activeSchedule.computeNextScheduledTime(clock.millis() - randomIntBetween(1, 999)));
|
||||
assertEquals(clock.millis() + oneHourMillis + oneHourMillis,
|
||||
activeSchedule.computeNextScheduledTime(clock.millis() + TimeUnit.SECONDS.toMillis(10L)));
|
||||
} finally {
|
||||
engine.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void assertFailedListenerLogMessage(Logger mockLogger, int times) {
|
||||
final ArgumentCaptor<ParameterizedMessage> messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class);
|
||||
final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
|
||||
|
|
Loading…
Reference in New Issue