From 8e6649ba157f1389be4434f4f8e7fc81ae907ffc Mon Sep 17 00:00:00 2001
From: Mark Payne
Date: Mon, 4 Dec 2017 13:29:24 -0500
Subject: [PATCH] NIFI-2776: This closes #2315. When joining a cluster, if a
processor is stopping but cluster indicates that processor should be running,
cause processor to start when its last thread finishes
Signed-off-by: joewitt
---
.../nifi/controller/ProcessScheduler.java | 6 +-
.../apache/nifi/controller/ProcessorNode.java | 21 +--
.../controller/SchedulingAgentCallback.java | 4 +-
.../org/apache/nifi/groups/ProcessGroup.java | 8 +-
.../nifi/controller/FlowController.java | 8 +-
.../controller/StandardFlowSynchronizer.java | 2 +-
.../controller/StandardProcessorNode.java | 129 ++++++++++--------
.../scheduling/StandardProcessScheduler.java | 14 +-
.../StandardControllerServiceProvider.java | 2 +-
.../nifi/groups/StandardProcessGroup.java | 6 +-
.../controller/TestStandardProcessorNode.java | 6 +-
.../scheduling/TestProcessorLifecycle.java | 32 ++---
.../TestStandardProcessScheduler.java | 2 +-
.../service/mock/MockProcessGroup.java | 2 +-
.../web/dao/impl/StandardProcessGroupDAO.java | 2 +-
.../web/dao/impl/StandardProcessorDAO.java | 2 +-
16 files changed, 140 insertions(+), 106 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index c6f30b5bdc..4aa4066683 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -40,9 +40,13 @@ public interface ProcessScheduler {
* is already scheduled to run, does nothing.
*
* @param procNode to start
+ * @param failIfStopping If false
, and the Processor is in the 'STOPPING' state,
+ * then the Processor will automatically restart itself as soon as its last thread finishes. If this
+ * value is true
or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method
+ * will throw an {@link IllegalStateException}.
* @throws IllegalStateException if the Processor is disabled
*/
- Future startProcessor(ProcessorNode procNode);
+ Future startProcessor(ProcessorNode procNode, boolean failIfStopping);
/**
* Stops scheduling the given processor to run and invokes all methods on
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index ba2e59b7f1..7aad6b450a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -160,34 +160,37 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
* @param administrativeYieldMillis
* the amount of milliseconds to wait for administrative yield
* @param processContext
- * the instance of {@link ProcessContext} and
- * {@link ControllerServiceLookup}
+ * the instance of {@link ProcessContext}
* @param schedulingAgentCallback
* the callback provided by the {@link ProcessScheduler} to
* execute upon successful start of the Processor
+ * @param failIfStopping If false
, and the Processor is in the 'STOPPING' state,
+ * then the Processor will automatically restart itself as soon as its last thread finishes. If this
+ * value is true
or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method
+ * will throw an {@link IllegalStateException}.
*/
- public abstract void start(ScheduledExecutorService scheduler,
- long administrativeYieldMillis, T processContext, SchedulingAgentCallback schedulingAgentCallback);
+ public abstract void start(ScheduledExecutorService scheduler,
+ long administrativeYieldMillis, ProcessContext processContext, SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping);
/**
* Will stop the {@link Processor} represented by this {@link ProcessorNode}.
* Stopping processor typically means invoking its operation that is
* annotated with @OnUnschedule and then @OnStopped.
*
- * @param scheduler
+ * @param processScheduler the ProcessScheduler that can be used to re-schedule the processor if need be
+ * @param executor
* implementation of {@link ScheduledExecutorService} used to
* initiate processor stop task
* @param processContext
- * the instance of {@link ProcessContext} and
- * {@link ControllerServiceLookup}
+ * the instance of {@link ProcessContext}
* @param schedulingAgent
* the SchedulingAgent that is responsible for managing the scheduling of the ProcessorNode
* @param scheduleState
* the ScheduleState that can be used to ensure that the running state (STOPPED, RUNNING, etc.)
* as well as the active thread counts are kept in sync
*/
- public abstract CompletableFuture stop(ScheduledExecutorService scheduler,
- T processContext, SchedulingAgent schedulingAgent, ScheduleState scheduleState);
+ public abstract CompletableFuture stop(ProcessScheduler processScheduler, ScheduledExecutorService executor,
+ ProcessContext processContext, SchedulingAgent schedulingAgent, ScheduleState scheduleState);
/**
* Will set the state of the processor to STOPPED which essentially implies
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
index 9d66e38161..31a8745e87 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
@@ -20,9 +20,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Future;
public interface SchedulingAgentCallback {
- void postMonitor();
+ void onTaskComplete();
- Future> invokeMonitoringTask(Callable> task);
+ Future> scheduleTask(Callable> task);
void trigger();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index bf789f7b1b..0baba23fba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -161,10 +161,14 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
* Starts the given Processor
*
* @param processor the processor to start
+ * @param failIfStopping If false
, and the Processor is in the 'STOPPING' state,
+ * then the Processor will automatically restart itself as soon as its last thread finishes. If this
+ * value is true
or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method
+ * will throw an {@link IllegalStateException}.
* @throws IllegalStateException if the processor is not valid, or is
- * already running
+ * already running
*/
- CompletableFuture startProcessor(ProcessorNode processor);
+ CompletableFuture startProcessor(ProcessorNode processor, boolean failIfStopping);
/**
* Starts the given Input Port
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 49c17899a2..56b2590c9d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -805,7 +805,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
try {
if (connectable instanceof ProcessorNode) {
- connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
+ connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
} else {
startConnectable(connectable);
}
@@ -2984,6 +2984,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
public void startProcessor(final String parentGroupId, final String processorId) {
+ startProcessor(parentGroupId, processorId, true);
+ }
+
+ public void startProcessor(final String parentGroupId, final String processorId, final boolean failIfStopping) {
final ProcessGroup group = lookupGroup(parentGroupId);
final ProcessorNode node = group.getProcessor(processorId);
if (node == null) {
@@ -2993,7 +2997,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
writeLock.lock();
try {
if (initialized.get()) {
- group.startProcessor(node);
+ group.startProcessor(node, failIfStopping);
} else {
startConnectablesAfterInitialization.add(node);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 3d07456e1f..3a0b093326 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -761,7 +761,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
case RUNNING:
// we want to run now. Make sure processor is not disabled and then start it.
procNode.getProcessGroup().enableProcessor(procNode);
- controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier());
+ controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier(), false);
break;
case STOPPED:
if (procNode.getScheduledState() == ScheduledState.DISABLED) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 36cb62eb51..88912aa3c8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -130,6 +130,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private long runNanos = 0L;
private volatile long yieldNanos;
private final NiFiProperties nifiProperties;
+ private volatile ScheduledState desiredState;
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
// ??????? NOT any more
@@ -1281,68 +1282,81 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
*
*/
@Override
- public void start(final ScheduledExecutorService taskScheduler,
- final long administrativeYieldMillis, final T processContext, final SchedulingAgentCallback schedulingAgentCallback) {
+ public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final ProcessContext processContext,
+ final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) {
+
if (!this.isValid()) {
throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors());
}
final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
- final boolean starting;
+ ScheduledState currentState;
+ boolean starting;
synchronized (this) {
- starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING);
+ currentState = this.scheduledState.get();
+
+ if (currentState == ScheduledState.STOPPED) {
+ starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING);
+ if (starting) {
+ desiredState = ScheduledState.RUNNING;
+ }
+ } else if (currentState == ScheduledState.STOPPING && !failIfStopping) {
+ desiredState = ScheduledState.RUNNING;
+ return;
+ } else {
+ starting = false;
+ }
}
if (starting) { // will ensure that the Processor represented by this node can only be started once
- final Runnable startProcRunnable = new Runnable() {
+ taskScheduler.execute(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback));
+ } else {
+ final String procName = processorRef.get().toString();
+ LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState);
+ procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[] {procName, currentState});
+ }
+ }
+
+ private void initiateStart(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis,
+ final ProcessContext processContext, final SchedulingAgentCallback schedulingAgentCallback) {
+
+ final Processor processor = getProcessor();
+ final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
+
+ try {
+ invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable() {
@Override
- public void run() {
- try {
- invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable() {
- @Override
- public Void call() throws Exception {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
- return null;
- }
- }
- });
-
- if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
- schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
- } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
- }
- scheduledState.set(ScheduledState.STOPPED);
- }
- } catch (final Exception e) {
- final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
- procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {} seconds",
- new Object[]{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause);
- LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause);
-
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
-
- if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated
- taskScheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
- } else {
- scheduledState.set(ScheduledState.STOPPED);
- }
+ public Void call() throws Exception {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
+ return null;
}
}
- };
- taskScheduler.execute(startProcRunnable);
- } else {
- final String procName = processorRef.getClass().getSimpleName();
- LOG.warn("Can not start '" + procName
- + "' since it's already in the process of being started or it is DISABLED - "
- + scheduledState.get());
- procLog.warn("Can not start '" + procName
- + "' since it's already in the process of being started or it is DISABLED - "
- + scheduledState.get());
+ });
+
+ if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
+ schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
+ } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
+ }
+ scheduledState.set(ScheduledState.STOPPED);
+ }
+ } catch (final Exception e) {
+ final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+ procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {} seconds",
+ new Object[]{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause);
+ LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause);
+
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
+
+ if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated
+ taskScheduler.schedule(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback), administrativeYieldMillis, TimeUnit.MILLISECONDS);
+ } else {
+ scheduledState.set(ScheduledState.STOPPED);
+ }
}
}
@@ -1373,11 +1387,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
*
*/
@Override
- public CompletableFuture stop(final ScheduledExecutorService scheduler,
- final T processContext, final SchedulingAgent schedulingAgent, final ScheduleState scheduleState) {
+ public CompletableFuture stop(final ProcessScheduler processScheduler, final ScheduledExecutorService executor, final ProcessContext processContext,
+ final SchedulingAgent schedulingAgent, final ScheduleState scheduleState) {
final Processor processor = processorRef.get().getProcessor();
LOG.info("Stopping processor: " + processor.getClass());
+ desiredState = ScheduledState.STOPPED;
final CompletableFuture future = new CompletableFuture<>();
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once
@@ -1385,7 +1400,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// will continue to monitor active threads, invoking OnStopped once there are no
// active threads (with the exception of the thread performing shutdown operations)
- scheduler.execute(new Runnable() {
+ executor.execute(new Runnable() {
@Override
public void run() {
try {
@@ -1407,9 +1422,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
scheduleState.decrementActiveThreadCount();
scheduledState.set(ScheduledState.STOPPED);
future.complete(null);
+
+ if (desiredState == ScheduledState.RUNNING) {
+ processScheduler.startProcessor(StandardProcessorNode.this, true);
+ }
} else {
// Not all of the active threads have finished. Try again in 100 milliseconds.
- scheduler.schedule(this, 100, TimeUnit.MILLISECONDS);
+ executor.schedule(this, 100, TimeUnit.MILLISECONDS);
}
} catch (final Exception e) {
LOG.warn("Failed while shutting down processor " + processor, e);
@@ -1461,7 +1480,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
final long onScheduleTimeout = timeoutString == null ? 60000
: FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
- final Future> taskFuture = callback.invokeMonitoringTask(task);
+ final Future> taskFuture = callback.scheduleTask(task);
try {
taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
@@ -1482,7 +1501,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} catch (final ExecutionException e){
throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e);
} finally {
- callback.postMonitor();
+ callback.onTaskComplete();
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index c7f158180a..d08d701816 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -297,13 +297,13 @@ public final class StandardProcessScheduler implements ProcessScheduler {
* @see StandardProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable)
*/
@Override
- public synchronized CompletableFuture startProcessor(final ProcessorNode procNode) {
- StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
+ public synchronized CompletableFuture startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
+ final StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
this.encryptor, getStateManager(procNode.getIdentifier()));
final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode));
final CompletableFuture future = new CompletableFuture<>();
- SchedulingAgentCallback callback = new SchedulingAgentCallback() {
+ final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
@Override
public void trigger() {
getSchedulingAgent(procNode).schedule(procNode, scheduleState);
@@ -311,19 +311,19 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
@Override
- public Future> invokeMonitoringTask(Callable> task) {
+ public Future> scheduleTask(Callable> task) {
scheduleState.incrementActiveThreadCount();
return componentMonitoringThreadPool.submit(task);
}
@Override
- public void postMonitor() {
+ public void onTaskComplete() {
scheduleState.decrementActiveThreadCount();
}
};
LOG.info("Starting {}", procNode);
- procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback);
+ procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback, failIfStopping);
return future;
}
@@ -341,7 +341,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final ScheduleState state = getScheduleState(procNode);
LOG.info("Stopping {}", procNode);
- return procNode.stop(this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state);
+ return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state);
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 02e190ac91..48ad849ab6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -285,7 +285,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
// start all of the components that are not disabled
for (final ProcessorNode node : processors) {
if (node.getScheduledState() != ScheduledState.DISABLED) {
- node.getProcessGroup().startProcessor(node);
+ node.getProcessGroup().startProcessor(node, true);
updated.add(node);
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 1754cf7bdf..ec32cc1cc2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -320,7 +320,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
findAllProcessors().stream().filter(SCHEDULABLE_PROCESSORS).forEach(node -> {
try {
- node.getProcessGroup().startProcessor(node);
+ node.getProcessGroup().startProcessor(node, true);
} catch (final Throwable t) {
LOG.error("Unable to start processor {} due to {}", new Object[]{node.getIdentifier(), t});
}
@@ -1092,7 +1092,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
- public CompletableFuture startProcessor(final ProcessorNode processor) {
+ public CompletableFuture startProcessor(final ProcessorNode processor, final boolean failIfStopping) {
readLock.lock();
try {
if (getProcessor(processor.getIdentifier()) == null) {
@@ -1106,7 +1106,7 @@ public final class StandardProcessGroup implements ProcessGroup {
return CompletableFuture.completedFuture(null);
}
- return scheduler.startProcessor(processor);
+ return scheduler.startProcessor(processor, failIfStopping);
} finally {
readLock.unlock();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index 33c33c9609..7b3996347e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -105,11 +105,11 @@ public class TestStandardProcessorNode {
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null);
final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() {
@Override
- public void postMonitor() {
+ public void onTaskComplete() {
}
@Override
- public Future> invokeMonitoringTask(final Callable> task) {
+ public Future> scheduleTask(final Callable> task) {
return taskScheduler.submit(task);
}
@@ -119,7 +119,7 @@ public class TestStandardProcessorNode {
}
};
- procNode.start(taskScheduler, 20000L, processContext, schedulingAgentCallback);
+ procNode.start(taskScheduler, 20000L, processContext, schedulingAgentCallback, true);
Thread.sleep(1000L);
assertEquals(1, processor.onScheduledCount);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index f8f0426b64..b55e98da3c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -160,7 +160,7 @@ public class TestProcessorLifecycle {
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
}
@@ -184,9 +184,9 @@ public class TestProcessorLifecycle {
this.noop(testProcessor);
final ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
- ps.startProcessor(testProcNode);
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
+ ps.startProcessor(testProcNode, true);
+ ps.startProcessor(testProcNode, true);
Thread.sleep(500);
assertCondition(() -> testProcessor.operationNames.size() == 1);
@@ -302,7 +302,7 @@ public class TestProcessorLifecycle {
@Override
public void run() {
LockSupport.parkNanos(random.nextInt(9000000));
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
countDownCounter.countDown();
}
});
@@ -342,7 +342,7 @@ public class TestProcessorLifecycle {
this.longRunningOnSchedule(testProcessor, delay);
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 5000L);
ps.stopProcessor(testProcNode);
@@ -375,7 +375,7 @@ public class TestProcessorLifecycle {
testProcessor.keepFailingOnScheduledTimes = 2;
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 10000L);
ps.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
@@ -404,7 +404,7 @@ public class TestProcessorLifecycle {
testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE;
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
ps.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
@@ -429,7 +429,7 @@ public class TestProcessorLifecycle {
this.blockingInterruptableOnUnschedule(testProcessor);
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
ps.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L);
@@ -454,7 +454,7 @@ public class TestProcessorLifecycle {
this.blockingUninterruptableOnUnschedule(testProcessor);
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 3000L);
ps.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 4000L);
@@ -481,7 +481,7 @@ public class TestProcessorLifecycle {
testProcessor.generateExceptionOnTrigger = true;
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
ps.disableProcessor(testProcNode);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
@@ -503,7 +503,7 @@ public class TestProcessorLifecycle {
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
fail();
}
@@ -531,7 +531,7 @@ public class TestProcessorLifecycle {
testProcessor.withService = true;
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
fail();
}
@@ -563,7 +563,7 @@ public class TestProcessorLifecycle {
ProcessScheduler ps = fc.getProcessScheduler();
ps.enableControllerService(testServiceNode);
- ps.startProcessor(testProcNode);
+ ps.startProcessor(testProcNode, true);
Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
@@ -598,8 +598,8 @@ public class TestProcessorLifecycle {
testGroup.addConnection(connection);
ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNodeA);
- ps.startProcessor(testProcNodeB);
+ ps.startProcessor(testProcNodeA, true);
+ ps.startProcessor(testProcNodeB, true);
try {
testGroup.removeProcessor(testProcNodeA);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 0c4acd80a2..314738abbe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -206,7 +206,7 @@ public class TestStandardProcessScheduler {
procNode.setProperties(procProps);
scheduler.enableControllerService(service);
- scheduler.startProcessor(procNode);
+ scheduler.startProcessor(procNode, true);
Thread.sleep(1000L);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 9725ed8ac4..a28eb34e75 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -149,7 +149,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
- public CompletableFuture startProcessor(final ProcessorNode processor) {
+ public CompletableFuture startProcessor(final ProcessorNode processor, final boolean failIfStopping) {
return CompletableFuture.completedFuture(null);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index 258af72dda..ec584dee5c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -135,7 +135,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final Connectable connectable = group.findLocalConnectable(componentId);
if (ScheduledState.RUNNING.equals(state)) {
if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
- final CompletableFuture> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
+ final CompletableFuture> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
future = CompletableFuture.allOf(future, processorFuture);
} else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) {
connectable.getProcessGroup().startInputPort((Port) connectable);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index 429592c51d..ffbe21cd33 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -426,7 +426,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
// perform the appropriate action
switch (purposedScheduledState) {
case RUNNING:
- parentGroup.startProcessor(processor);
+ parentGroup.startProcessor(processor, true);
break;
case STOPPED:
switch (processor.getScheduledState()) {