From d98d335497b20a0ad16934495d423d2129cd1c79 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 6 Jun 2018 14:45:04 +0200 Subject: [PATCH] [NIFI-3242] Avoid double scheduling of a task due to quartz imprecision This closes #2789. Signed-off-by: Mark Payne --- .../scheduling/QuartzSchedulingAgent.java | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java index 731314f89a..43c5e56eae 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java @@ -77,7 +77,13 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState); final AtomicBoolean canceled = new AtomicBoolean(false); + final Date initialDate = cronExpression.getTimeAfter(new Date()); + final long initialDelay = initialDate.getTime() - System.currentTimeMillis(); + final Runnable command = new Runnable() { + + private Date nextSchedule = initialDate; + @Override public void run() { if (canceled.get()) { @@ -90,10 +96,10 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { return; } - final Date date = cronExpression.getTimeAfter(new Date()); - final long delay = date.getTime() - System.currentTimeMillis(); + nextSchedule = getNextSchedule(nextSchedule, cronExpression); + final long delay = getDelay(nextSchedule); - logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", taskNode, date, delay); + logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", taskNode, nextSchedule, delay); flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS); } }; @@ -102,8 +108,6 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { triggers.add(canceled); canceledTriggers.put(taskNode, triggers); - final Date initialDate = cronExpression.getTimeAfter(new Date()); - final long initialDelay = initialDate.getTime() - System.currentTimeMillis(); flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS); scheduleState.setScheduled(true); logger.info("Scheduled Reporting Task {} to run threads on schedule {}", taskNode, cronSchedule); @@ -129,7 +133,14 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { final ConnectableTask continuallyRunTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState, encryptor); final AtomicBoolean canceled = new AtomicBoolean(false); + + final Date initialDate = cronExpression.getTimeAfter(new Date()); + final long initialDelay = initialDate.getTime() - System.currentTimeMillis(); + final Runnable command = new Runnable() { + + private Date nextSchedule = initialDate; + @Override public void run() { if (canceled.get()) { @@ -148,16 +159,15 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { return; } - final Date date = cronExpression.getTimeAfter(new Date()); - final long delay = date.getTime() - System.currentTimeMillis(); + nextSchedule = getNextSchedule(nextSchedule, cronExpression); + final long delay = getDelay(nextSchedule); - logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", connectable, date, delay); + logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", connectable, nextSchedule, delay); flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS); } }; - final Date initialDate = cronExpression.getTimeAfter(new Date()); - final long initialDelay = initialDate.getTime() - System.currentTimeMillis(); + flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS); triggers.add(canceled); } @@ -222,4 +232,16 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { flowEngine.setCorePoolSize(corePoolSize + toAdd); } + + private static Date getNextSchedule(final Date currentSchedule, final CronExpression cronExpression) { + // Since the clock has not a millisecond precision, we have to check that we + // schedule the next time after the time this was supposed to run, otherwise + // we might end up with running the same task twice + final Date now = new Date(); + return cronExpression.getTimeAfter(now.after(currentSchedule) ? now : currentSchedule); + } + + private static long getDelay(Date nextSchedule) { + return Math.max(nextSchedule.getTime() - System.currentTimeMillis(), 0L); + } }