mirror of https://github.com/apache/nifi.git
NIFI-362: Ensure that we synchronize on ScheduleState before modifying it
This commit is contained in:
parent
4cc106a54d
commit
f1e74cc041
|
@ -132,8 +132,13 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
|
||||||
final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
|
final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
|
||||||
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
|
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
|
||||||
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
||||||
scheduleState.replaceFuture(scheduledFuture, newFuture);
|
|
||||||
futureRef.set(newFuture);
|
synchronized (scheduleState) {
|
||||||
|
if ( scheduleState.isScheduled() ) {
|
||||||
|
scheduleState.replaceFuture(scheduledFuture, newFuture);
|
||||||
|
futureRef.set(newFuture);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if ( shouldYield ) {
|
} else if ( shouldYield ) {
|
||||||
// Component itself didn't yield but there was no work to do, so the framework will choose
|
// Component itself didn't yield but there was no work to do, so the framework will choose
|
||||||
|
@ -149,8 +154,13 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
|
||||||
if (scheduledFuture.cancel(false)) {
|
if (scheduledFuture.cancel(false)) {
|
||||||
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS,
|
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS,
|
||||||
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
||||||
scheduleState.replaceFuture(scheduledFuture, newFuture);
|
|
||||||
futureRef.set(newFuture);
|
synchronized (scheduleState) {
|
||||||
|
if ( scheduleState.isScheduled() ) {
|
||||||
|
scheduleState.replaceFuture(scheduledFuture, newFuture);
|
||||||
|
futureRef.set(newFuture);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue