[NIFI-3242] Avoid double scheduling of a task due to quartz imprecision

This closes #2789.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Marco Gaido 2018-06-06 14:45:04 +02:00 committed by Mark Payne
parent 8370a56dce
commit d98d335497
1 changed files with 32 additions and 10 deletions

View File

@ -77,7 +77,13 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState);
final AtomicBoolean canceled = new AtomicBoolean(false);
final Date initialDate = cronExpression.getTimeAfter(new Date());
final long initialDelay = initialDate.getTime() - System.currentTimeMillis();
final Runnable command = new Runnable() {
private Date nextSchedule = initialDate;
@Override
public void run() {
if (canceled.get()) {
@ -90,10 +96,10 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
return;
}
final Date date = cronExpression.getTimeAfter(new Date());
final long delay = date.getTime() - System.currentTimeMillis();
nextSchedule = getNextSchedule(nextSchedule, cronExpression);
final long delay = getDelay(nextSchedule);
logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", taskNode, date, delay);
logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", taskNode, nextSchedule, delay);
flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
}
};
@ -102,8 +108,6 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
triggers.add(canceled);
canceledTriggers.put(taskNode, triggers);
final Date initialDate = cronExpression.getTimeAfter(new Date());
final long initialDelay = initialDate.getTime() - System.currentTimeMillis();
flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
scheduleState.setScheduled(true);
logger.info("Scheduled Reporting Task {} to run threads on schedule {}", taskNode, cronSchedule);
@ -129,7 +133,14 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
final ConnectableTask continuallyRunTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState, encryptor);
final AtomicBoolean canceled = new AtomicBoolean(false);
final Date initialDate = cronExpression.getTimeAfter(new Date());
final long initialDelay = initialDate.getTime() - System.currentTimeMillis();
final Runnable command = new Runnable() {
private Date nextSchedule = initialDate;
@Override
public void run() {
if (canceled.get()) {
@ -148,16 +159,15 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
return;
}
final Date date = cronExpression.getTimeAfter(new Date());
final long delay = date.getTime() - System.currentTimeMillis();
nextSchedule = getNextSchedule(nextSchedule, cronExpression);
final long delay = getDelay(nextSchedule);
logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", connectable, date, delay);
logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", connectable, nextSchedule, delay);
flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
}
};
final Date initialDate = cronExpression.getTimeAfter(new Date());
final long initialDelay = initialDate.getTime() - System.currentTimeMillis();
flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
triggers.add(canceled);
}
@ -222,4 +232,16 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
flowEngine.setCorePoolSize(corePoolSize + toAdd);
}
private static Date getNextSchedule(final Date currentSchedule, final CronExpression cronExpression) {
// Since the clock has not a millisecond precision, we have to check that we
// schedule the next time after the time this was supposed to run, otherwise
// we might end up with running the same task twice
final Date now = new Date();
return cronExpression.getTimeAfter(now.after(currentSchedule) ? now : currentSchedule);
}
private static long getDelay(Date nextSchedule) {
return Math.max(nextSchedule.getTime() - System.currentTimeMillis(), 0L);
}
}