mirror of https://github.com/apache/nifi.git
NIFI-1452 on timer-driven yield, use the greater of yield duration or run schedule
This closes #1832.
This commit is contained in:
parent
b12cf8a6d2
commit
232380dbfd
|
@ -139,8 +139,10 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
||||||
// If the component is yielded, cancel its future and re-submit it to run again
|
// If the component is yielded, cancel its future and re-submit it to run again
|
||||||
// after the yield has expired.
|
// after the yield has expired.
|
||||||
final long newYieldExpiration = connectable.getYieldExpiration();
|
final long newYieldExpiration = connectable.getYieldExpiration();
|
||||||
if (newYieldExpiration > System.currentTimeMillis()) {
|
final long now = System.currentTimeMillis();
|
||||||
final long yieldMillis = newYieldExpiration - System.currentTimeMillis();
|
if (newYieldExpiration > now) {
|
||||||
|
final long yieldMillis = newYieldExpiration - now;
|
||||||
|
final long scheduleMillis = connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS);
|
||||||
final ScheduledFuture<?> scheduledFuture = futureRef.get();
|
final ScheduledFuture<?> scheduledFuture = futureRef.get();
|
||||||
if (scheduledFuture == null) {
|
if (scheduledFuture == null) {
|
||||||
return;
|
return;
|
||||||
|
@ -150,7 +152,7 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
||||||
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
|
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
|
||||||
// so that we can do this again the next time that the component is yielded.
|
// so that we can do this again the next time that the component is yielded.
|
||||||
if (scheduledFuture.cancel(false)) {
|
if (scheduledFuture.cancel(false)) {
|
||||||
final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
|
final long yieldNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(scheduleMillis), TimeUnit.MILLISECONDS.toNanos(yieldMillis));
|
||||||
|
|
||||||
synchronized (scheduleState) {
|
synchronized (scheduleState) {
|
||||||
if (scheduleState.isScheduled()) {
|
if (scheduleState.isScheduled()) {
|
||||||
|
|
|
@ -75,7 +75,10 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean isYielded(final ProcessorNode procNode) {
|
static boolean isYielded(final ProcessorNode procNode) {
|
||||||
return procNode.getYieldExpiration() >= System.currentTimeMillis();
|
// after one yield period, the scheduling agent could call this again when
|
||||||
|
// yieldExpiration == currentTime, and we don't want that to still be considered 'yielded'
|
||||||
|
// so this uses ">" instead of ">="
|
||||||
|
return procNode.getYieldExpiration() > System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean isWorkToDo(final ProcessorNode procNode) {
|
static boolean isWorkToDo(final ProcessorNode procNode) {
|
||||||
|
|
Loading…
Reference in New Issue