mirror of https://github.com/apache/nifi.git
NIFI-381: do not re-schedule processor to run after yield if not scheduled to run anymore
This commit is contained in:
parent
e370d7d7e3
commit
a956623ff9
|
@ -43,7 +43,6 @@ import org.apache.nifi.controller.ProcessorNode;
|
||||||
import org.apache.nifi.controller.ReportingTaskNode;
|
import org.apache.nifi.controller.ReportingTaskNode;
|
||||||
import org.apache.nifi.controller.ScheduledState;
|
import org.apache.nifi.controller.ScheduledState;
|
||||||
import org.apache.nifi.controller.annotation.OnConfigured;
|
import org.apache.nifi.controller.annotation.OnConfigured;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceNode;
|
|
||||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||||
import org.apache.nifi.encrypt.StringEncryptor;
|
import org.apache.nifi.encrypt.StringEncryptor;
|
||||||
import org.apache.nifi.engine.FlowEngine;
|
import org.apache.nifi.engine.FlowEngine;
|
||||||
|
@ -374,9 +373,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state.setScheduled(false);
|
||||||
getSchedulingAgent(procNode).unschedule(procNode, state);
|
getSchedulingAgent(procNode).unschedule(procNode, state);
|
||||||
procNode.setScheduledState(ScheduledState.STOPPED);
|
procNode.setScheduledState(ScheduledState.STOPPED);
|
||||||
state.setScheduled(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final Runnable stopProcRunnable = new Runnable() {
|
final Runnable stopProcRunnable = new Runnable() {
|
||||||
|
@ -474,8 +473,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
if (!state.isScheduled()) {
|
if (!state.isScheduled()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
state.setScheduled(false);
|
|
||||||
|
|
||||||
|
state.setScheduled(false);
|
||||||
getSchedulingAgent(connectable).unschedule(connectable, state);
|
getSchedulingAgent(connectable).unschedule(connectable, state);
|
||||||
|
|
||||||
if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
|
if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
|
||||||
|
|
|
@ -130,11 +130,12 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
|
||||||
// so that we can do this again the next time that the component is yielded.
|
// so that we can do this again the next time that the component is yielded.
|
||||||
if (scheduledFuture.cancel(false)) {
|
if (scheduledFuture.cancel(false)) {
|
||||||
final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
|
final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
|
||||||
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
|
|
||||||
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
|
||||||
|
|
||||||
synchronized (scheduleState) {
|
synchronized (scheduleState) {
|
||||||
if ( scheduleState.isScheduled() ) {
|
if ( scheduleState.isScheduled() ) {
|
||||||
|
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
|
||||||
|
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
||||||
|
|
||||||
scheduleState.replaceFuture(scheduledFuture, newFuture);
|
scheduleState.replaceFuture(scheduledFuture, newFuture);
|
||||||
futureRef.set(newFuture);
|
futureRef.set(newFuture);
|
||||||
}
|
}
|
||||||
|
@ -152,11 +153,11 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
|
||||||
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
|
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
|
||||||
// so that we can do this again the next time that the component is yielded.
|
// so that we can do this again the next time that the component is yielded.
|
||||||
if (scheduledFuture.cancel(false)) {
|
if (scheduledFuture.cancel(false)) {
|
||||||
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS,
|
|
||||||
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
|
||||||
|
|
||||||
synchronized (scheduleState) {
|
synchronized (scheduleState) {
|
||||||
if ( scheduleState.isScheduled() ) {
|
if ( scheduleState.isScheduled() ) {
|
||||||
|
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS,
|
||||||
|
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
||||||
|
|
||||||
scheduleState.replaceFuture(scheduledFuture, newFuture);
|
scheduleState.replaceFuture(scheduledFuture, newFuture);
|
||||||
futureRef.set(newFuture);
|
futureRef.set(newFuture);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue