diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java index c6f30b5bdc..4aa4066683 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -40,9 +40,13 @@ public interface ProcessScheduler { * is already scheduled to run, does nothing. * * @param procNode to start + * @param failIfStopping If false, and the Processor is in the 'STOPPING' state, + * then the Processor will automatically restart itself as soon as its last thread finishes. If this + * value is true or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method + * will throw an {@link IllegalStateException}. * @throws IllegalStateException if the Processor is disabled */ - Future startProcessor(ProcessorNode procNode); + Future startProcessor(ProcessorNode procNode, boolean failIfStopping); /** * Stops scheduling the given processor to run and invokes all methods on diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index ba2e59b7f1..7aad6b450a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -160,34 +160,37 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen * @param administrativeYieldMillis * the amount of milliseconds to wait for administrative yield * @param processContext - * the instance of {@link ProcessContext} and - * {@link ControllerServiceLookup} + * the instance of {@link ProcessContext} * @param schedulingAgentCallback * the callback provided by the {@link ProcessScheduler} to * execute upon successful start of the Processor + * @param failIfStopping If false, and the Processor is in the 'STOPPING' state, + * then the Processor will automatically restart itself as soon as its last thread finishes. If this + * value is true or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method + * will throw an {@link IllegalStateException}. */ - public abstract void start(ScheduledExecutorService scheduler, - long administrativeYieldMillis, T processContext, SchedulingAgentCallback schedulingAgentCallback); + public abstract void start(ScheduledExecutorService scheduler, + long administrativeYieldMillis, ProcessContext processContext, SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping); /** * Will stop the {@link Processor} represented by this {@link ProcessorNode}. * Stopping processor typically means invoking its operation that is * annotated with @OnUnschedule and then @OnStopped. * - * @param scheduler + * @param processScheduler the ProcessScheduler that can be used to re-schedule the processor if need be + * @param executor * implementation of {@link ScheduledExecutorService} used to * initiate processor stop task * @param processContext - * the instance of {@link ProcessContext} and - * {@link ControllerServiceLookup} + * the instance of {@link ProcessContext} * @param schedulingAgent * the SchedulingAgent that is responsible for managing the scheduling of the ProcessorNode * @param scheduleState * the ScheduleState that can be used to ensure that the running state (STOPPED, RUNNING, etc.) * as well as the active thread counts are kept in sync */ - public abstract CompletableFuture stop(ScheduledExecutorService scheduler, - T processContext, SchedulingAgent schedulingAgent, ScheduleState scheduleState); + public abstract CompletableFuture stop(ProcessScheduler processScheduler, ScheduledExecutorService executor, + ProcessContext processContext, SchedulingAgent schedulingAgent, ScheduleState scheduleState); /** * Will set the state of the processor to STOPPED which essentially implies diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java index 9d66e38161..31a8745e87 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java @@ -20,9 +20,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; public interface SchedulingAgentCallback { - void postMonitor(); + void onTaskComplete(); - Future invokeMonitoringTask(Callable task); + Future scheduleTask(Callable task); void trigger(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index bf789f7b1b..0baba23fba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -161,10 +161,14 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable { * Starts the given Processor * * @param processor the processor to start + * @param failIfStopping If false, and the Processor is in the 'STOPPING' state, + * then the Processor will automatically restart itself as soon as its last thread finishes. If this + * value is true or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method + * will throw an {@link IllegalStateException}. * @throws IllegalStateException if the processor is not valid, or is - * already running + * already running */ - CompletableFuture startProcessor(ProcessorNode processor); + CompletableFuture startProcessor(ProcessorNode processor, boolean failIfStopping); /** * Starts the given Input Port diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 49c17899a2..56b2590c9d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -805,7 +805,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { if (connectable instanceof ProcessorNode) { - connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); + connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true); } else { startConnectable(connectable); } @@ -2984,6 +2984,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } public void startProcessor(final String parentGroupId, final String processorId) { + startProcessor(parentGroupId, processorId, true); + } + + public void startProcessor(final String parentGroupId, final String processorId, final boolean failIfStopping) { final ProcessGroup group = lookupGroup(parentGroupId); final ProcessorNode node = group.getProcessor(processorId); if (node == null) { @@ -2993,7 +2997,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R writeLock.lock(); try { if (initialized.get()) { - group.startProcessor(node); + group.startProcessor(node, failIfStopping); } else { startConnectablesAfterInitialization.add(node); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 3d07456e1f..3a0b093326 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -761,7 +761,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { case RUNNING: // we want to run now. Make sure processor is not disabled and then start it. procNode.getProcessGroup().enableProcessor(procNode); - controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier()); + controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier(), false); break; case STOPPED: if (procNode.getScheduledState() == ScheduledState.DISABLED) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 36cb62eb51..88912aa3c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -130,6 +130,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private long runNanos = 0L; private volatile long yieldNanos; private final NiFiProperties nifiProperties; + private volatile ScheduledState desiredState; private SchedulingStrategy schedulingStrategy; // guarded by read/write lock // ??????? NOT any more @@ -1281,68 +1282,81 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable *

*/ @Override - public void start(final ScheduledExecutorService taskScheduler, - final long administrativeYieldMillis, final T processContext, final SchedulingAgentCallback schedulingAgentCallback) { + public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final ProcessContext processContext, + final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) { + if (!this.isValid()) { throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors()); } final Processor processor = processorRef.get().getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); - final boolean starting; + ScheduledState currentState; + boolean starting; synchronized (this) { - starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING); + currentState = this.scheduledState.get(); + + if (currentState == ScheduledState.STOPPED) { + starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING); + if (starting) { + desiredState = ScheduledState.RUNNING; + } + } else if (currentState == ScheduledState.STOPPING && !failIfStopping) { + desiredState = ScheduledState.RUNNING; + return; + } else { + starting = false; + } } if (starting) { // will ensure that the Processor represented by this node can only be started once - final Runnable startProcRunnable = new Runnable() { + taskScheduler.execute(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback)); + } else { + final String procName = processorRef.get().toString(); + LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState); + procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[] {procName, currentState}); + } + } + + private void initiateStart(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, + final ProcessContext processContext, final SchedulingAgentCallback schedulingAgentCallback) { + + final Processor processor = getProcessor(); + final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); + + try { + invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable() { @Override - public void run() { - try { - invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable() { - @Override - public Void call() throws Exception { - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { - ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext); - return null; - } - } - }); - - if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { - schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle - } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); - } - scheduledState.set(ScheduledState.STOPPED); - } - } catch (final Exception e) { - final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; - procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {} seconds", - new Object[]{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause); - LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause); - - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); - - if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated - taskScheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS); - } else { - scheduledState.set(ScheduledState.STOPPED); - } + public Void call() throws Exception { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { + ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext); + return null; } } - }; - taskScheduler.execute(startProcRunnable); - } else { - final String procName = processorRef.getClass().getSimpleName(); - LOG.warn("Can not start '" + procName - + "' since it's already in the process of being started or it is DISABLED - " - + scheduledState.get()); - procLog.warn("Can not start '" + procName - + "' since it's already in the process of being started or it is DISABLED - " - + scheduledState.get()); + }); + + if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { + schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle + } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + } + scheduledState.set(ScheduledState.STOPPED); + } + } catch (final Exception e) { + final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; + procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {} seconds", + new Object[]{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause); + LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause); + + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); + + if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated + taskScheduler.schedule(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback), administrativeYieldMillis, TimeUnit.MILLISECONDS); + } else { + scheduledState.set(ScheduledState.STOPPED); + } } } @@ -1373,11 +1387,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable *

*/ @Override - public CompletableFuture stop(final ScheduledExecutorService scheduler, - final T processContext, final SchedulingAgent schedulingAgent, final ScheduleState scheduleState) { + public CompletableFuture stop(final ProcessScheduler processScheduler, final ScheduledExecutorService executor, final ProcessContext processContext, + final SchedulingAgent schedulingAgent, final ScheduleState scheduleState) { final Processor processor = processorRef.get().getProcessor(); LOG.info("Stopping processor: " + processor.getClass()); + desiredState = ScheduledState.STOPPED; final CompletableFuture future = new CompletableFuture<>(); if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once @@ -1385,7 +1400,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable // will continue to monitor active threads, invoking OnStopped once there are no // active threads (with the exception of the thread performing shutdown operations) - scheduler.execute(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { try { @@ -1407,9 +1422,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable scheduleState.decrementActiveThreadCount(); scheduledState.set(ScheduledState.STOPPED); future.complete(null); + + if (desiredState == ScheduledState.RUNNING) { + processScheduler.startProcessor(StandardProcessorNode.this, true); + } } else { // Not all of the active threads have finished. Try again in 100 milliseconds. - scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); + executor.schedule(this, 100, TimeUnit.MILLISECONDS); } } catch (final Exception e) { LOG.warn("Failed while shutting down processor " + processor, e); @@ -1461,7 +1480,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); final long onScheduleTimeout = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); - final Future taskFuture = callback.invokeMonitoringTask(task); + final Future taskFuture = callback.scheduleTask(task); try { taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS); } catch (final InterruptedException e) { @@ -1482,7 +1501,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } catch (final ExecutionException e){ throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e); } finally { - callback.postMonitor(); + callback.onTaskComplete(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index c7f158180a..d08d701816 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -297,13 +297,13 @@ public final class StandardProcessScheduler implements ProcessScheduler { * @see StandardProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable) */ @Override - public synchronized CompletableFuture startProcessor(final ProcessorNode procNode) { - StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, + public synchronized CompletableFuture startProcessor(final ProcessorNode procNode, final boolean failIfStopping) { + final StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, this.encryptor, getStateManager(procNode.getIdentifier())); final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode)); final CompletableFuture future = new CompletableFuture<>(); - SchedulingAgentCallback callback = new SchedulingAgentCallback() { + final SchedulingAgentCallback callback = new SchedulingAgentCallback() { @Override public void trigger() { getSchedulingAgent(procNode).schedule(procNode, scheduleState); @@ -311,19 +311,19 @@ public final class StandardProcessScheduler implements ProcessScheduler { } @Override - public Future invokeMonitoringTask(Callable task) { + public Future scheduleTask(Callable task) { scheduleState.incrementActiveThreadCount(); return componentMonitoringThreadPool.submit(task); } @Override - public void postMonitor() { + public void onTaskComplete() { scheduleState.decrementActiveThreadCount(); } }; LOG.info("Starting {}", procNode); - procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback); + procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback, failIfStopping); return future; } @@ -341,7 +341,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { final ScheduleState state = getScheduleState(procNode); LOG.info("Stopping {}", procNode); - return procNode.stop(this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state); + return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 02e190ac91..48ad849ab6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -285,7 +285,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi // start all of the components that are not disabled for (final ProcessorNode node : processors) { if (node.getScheduledState() != ScheduledState.DISABLED) { - node.getProcessGroup().startProcessor(node); + node.getProcessGroup().startProcessor(node, true); updated.add(node); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 1754cf7bdf..ec32cc1cc2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -320,7 +320,7 @@ public final class StandardProcessGroup implements ProcessGroup { try { findAllProcessors().stream().filter(SCHEDULABLE_PROCESSORS).forEach(node -> { try { - node.getProcessGroup().startProcessor(node); + node.getProcessGroup().startProcessor(node, true); } catch (final Throwable t) { LOG.error("Unable to start processor {} due to {}", new Object[]{node.getIdentifier(), t}); } @@ -1092,7 +1092,7 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override - public CompletableFuture startProcessor(final ProcessorNode processor) { + public CompletableFuture startProcessor(final ProcessorNode processor, final boolean failIfStopping) { readLock.lock(); try { if (getProcessor(processor.getIdentifier()) == null) { @@ -1106,7 +1106,7 @@ public final class StandardProcessGroup implements ProcessGroup { return CompletableFuture.completedFuture(null); } - return scheduler.startProcessor(processor); + return scheduler.startProcessor(processor, failIfStopping); } finally { readLock.unlock(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java index 33c33c9609..7b3996347e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java @@ -105,11 +105,11 @@ public class TestStandardProcessorNode { final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null); final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() { @Override - public void postMonitor() { + public void onTaskComplete() { } @Override - public Future invokeMonitoringTask(final Callable task) { + public Future scheduleTask(final Callable task) { return taskScheduler.submit(task); } @@ -119,7 +119,7 @@ public class TestStandardProcessorNode { } }; - procNode.start(taskScheduler, 20000L, processContext, schedulingAgentCallback); + procNode.start(taskScheduler, 20000L, processContext, schedulingAgentCallback, true); Thread.sleep(1000L); assertEquals(1, processor.onScheduledCount); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index f8f0426b64..b55e98da3c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -160,7 +160,7 @@ public class TestProcessorLifecycle { assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); } @@ -184,9 +184,9 @@ public class TestProcessorLifecycle { this.noop(testProcessor); final ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); - ps.startProcessor(testProcNode); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); + ps.startProcessor(testProcNode, true); + ps.startProcessor(testProcNode, true); Thread.sleep(500); assertCondition(() -> testProcessor.operationNames.size() == 1); @@ -302,7 +302,7 @@ public class TestProcessorLifecycle { @Override public void run() { LockSupport.parkNanos(random.nextInt(9000000)); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); countDownCounter.countDown(); } }); @@ -342,7 +342,7 @@ public class TestProcessorLifecycle { this.longRunningOnSchedule(testProcessor, delay); ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 5000L); ps.stopProcessor(testProcNode); @@ -375,7 +375,7 @@ public class TestProcessorLifecycle { testProcessor.keepFailingOnScheduledTimes = 2; ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 10000L); ps.stopProcessor(testProcNode); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L); @@ -404,7 +404,7 @@ public class TestProcessorLifecycle { testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE; ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L); @@ -429,7 +429,7 @@ public class TestProcessorLifecycle { this.blockingInterruptableOnUnschedule(testProcessor); ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L); @@ -454,7 +454,7 @@ public class TestProcessorLifecycle { this.blockingUninterruptableOnUnschedule(testProcessor); ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 3000L); ps.stopProcessor(testProcNode); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 4000L); @@ -481,7 +481,7 @@ public class TestProcessorLifecycle { testProcessor.generateExceptionOnTrigger = true; ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.disableProcessor(testProcNode); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); @@ -503,7 +503,7 @@ public class TestProcessorLifecycle { ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); fail(); } @@ -531,7 +531,7 @@ public class TestProcessorLifecycle { testProcessor.withService = true; ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); fail(); } @@ -563,7 +563,7 @@ public class TestProcessorLifecycle { ProcessScheduler ps = fc.getProcessScheduler(); ps.enableControllerService(testServiceNode); - ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode, true); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); @@ -598,8 +598,8 @@ public class TestProcessorLifecycle { testGroup.addConnection(connection); ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNodeA); - ps.startProcessor(testProcNodeB); + ps.startProcessor(testProcNodeA, true); + ps.startProcessor(testProcNodeB, true); try { testGroup.removeProcessor(testProcNodeA); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 0c4acd80a2..314738abbe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -206,7 +206,7 @@ public class TestStandardProcessScheduler { procNode.setProperties(procProps); scheduler.enableControllerService(service); - scheduler.startProcessor(procNode); + scheduler.startProcessor(procNode, true); Thread.sleep(1000L); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index 9725ed8ac4..a28eb34e75 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -149,7 +149,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public CompletableFuture startProcessor(final ProcessorNode processor) { + public CompletableFuture startProcessor(final ProcessorNode processor, final boolean failIfStopping) { return CompletableFuture.completedFuture(null); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index 258af72dda..ec584dee5c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -135,7 +135,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final Connectable connectable = group.findLocalConnectable(componentId); if (ScheduledState.RUNNING.equals(state)) { if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) { - final CompletableFuture processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); + final CompletableFuture processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true); future = CompletableFuture.allOf(future, processorFuture); } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) { connectable.getProcessGroup().startInputPort((Port) connectable); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index 429592c51d..ffbe21cd33 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -426,7 +426,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { // perform the appropriate action switch (purposedScheduledState) { case RUNNING: - parentGroup.startProcessor(processor); + parentGroup.startProcessor(processor, true); break; case STOPPED: switch (processor.getScheduledState()) {