From 232380dbfd59de45c4c6623f141d6e7052c367f9 Mon Sep 17 00:00:00 2001 From: Mike Moser Date: Fri, 19 May 2017 19:48:10 +0000 Subject: [PATCH] NIFI-1452 on timer-driven yield, use the greater of yield duration or run schedule This closes #1832. --- .../controller/scheduling/TimerDrivenSchedulingAgent.java | 8 +++++--- .../controller/tasks/ContinuallyRunProcessorTask.java | 5 ++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index fcd901f4a4..a82fde4859 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -139,8 +139,10 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { // If the component is yielded, cancel its future and re-submit it to run again // after the yield has expired. final long newYieldExpiration = connectable.getYieldExpiration(); - if (newYieldExpiration > System.currentTimeMillis()) { - final long yieldMillis = newYieldExpiration - System.currentTimeMillis(); + final long now = System.currentTimeMillis(); + if (newYieldExpiration > now) { + final long yieldMillis = newYieldExpiration - now; + final long scheduleMillis = connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS); final ScheduledFuture scheduledFuture = futureRef.get(); if (scheduledFuture == null) { return; @@ -150,7 +152,7 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { // an accurate accounting of which futures are outstanding; we must then also update the futureRef // so that we can do this again the next time that the component is yielded. if (scheduledFuture.cancel(false)) { - final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis); + final long yieldNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(scheduleMillis), TimeUnit.MILLISECONDS.toNanos(yieldMillis)); synchronized (scheduleState) { if (scheduleState.isScheduled()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index 01f3c8cb0a..f2a7eee13b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -75,7 +75,10 @@ public class ContinuallyRunProcessorTask implements Callable { } static boolean isYielded(final ProcessorNode procNode) { - return procNode.getYieldExpiration() >= System.currentTimeMillis(); + // after one yield period, the scheduling agent could call this again when + // yieldExpiration == currentTime, and we don't want that to still be considered 'yielded' + // so this uses ">" instead of ">=" + return procNode.getYieldExpiration() > System.currentTimeMillis(); } static boolean isWorkToDo(final ProcessorNode procNode) {