diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java b/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java index bf608dba0c..f457a9c991 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java @@ -37,5 +37,7 @@ public enum ScheduledState { STARTING, - STOPPING; + STOPPING, + + RUN_ONCE; } diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc index 88d9d7fd7f..0e85377d00 100644 --- a/nifi-docs/src/main/asciidoc/user-guide.adoc +++ b/nifi-docs/src/main/asciidoc/user-guide.adoc @@ -300,6 +300,7 @@ While the options available from the context menu vary, the following options ar NOTE: For Processors, Ports, Remote Process Groups, Connections and Labels, it is possible to open the configuration dialog by double-clicking on the desired component. - *Start* or *Stop*: This option allows the user to start or stop a Processor; the option will be either Start or Stop, depending on the current state of the Processor. +- *Run Once*: This option allows the user to run a selected Processor exactly once. If the Processor is prevented from executing (e.g. there are no incoming flow files or the outgoing connection has backpressure applied) the Processor won't get triggered. *Execution* settings apply - i.e. *Primary Node* and *All Nodes* setting will result in running the Processor only once on the Primary Node or one time on each of the nodes, respectively. Works only with *Timer Driven* and *CRON driven* Scheduling Strategy. - *Enable* or *Disable*: This option allows the user to enable or disable a Processor; the option will be either Enable or Disable, depending on the current state of the Processor. - *View data provenance*: This option displays the NiFi Data Provenance table, with information about data provenance events for the FlowFiles routed through that Processor (see <>). - *View status history*: This option opens a graphical representation of the Processor's statistical information over time. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorRunStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorRunStatusEntity.java index 4f8ef7428f..44323a3261 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorRunStatusEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorRunStatusEntity.java @@ -26,7 +26,7 @@ import javax.xml.bind.annotation.XmlType; @XmlType(name = "processorRunStatus") public class ProcessorRunStatusEntity extends ComponentRunStatusEntity { - private static String[] SUPPORTED_STATE = {"RUNNING", "STOPPED", "DISABLED"}; + private static String[] SUPPORTED_STATE = {"RUNNING", "RUN_ONCE", "STOPPED", "DISABLED"}; @Override protected String[] getSupportedState() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 4281e5cb74..4dd15b1b66 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1358,6 +1358,32 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final Supplier processContextFactory, final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) { + run(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback, failIfStopping, ScheduledState.RUNNING, ScheduledState.STARTING); + } + + /** + * Similar to {@link #start(ScheduledExecutorService, long, long, Supplier, SchedulingAgentCallback, boolean)}, except for the following: + * + */ + @Override + public void runOnce(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final Supplier processContextFactory, + final SchedulingAgentCallback schedulingAgentCallback) { + + run(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback, true, ScheduledState.RUN_ONCE, ScheduledState.RUN_ONCE); + } + + private void run(ScheduledExecutorService taskScheduler, long administrativeYieldMillis, long timeoutMillis, Supplier processContextFactory, + SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping, ScheduledState desiredSate, ScheduledState scheduledState) { + final Processor processor = processorRef.get().getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); LOG.info("Starting {}", this); @@ -1368,12 +1394,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable currentState = this.scheduledState.get(); if (currentState == ScheduledState.STOPPED) { - starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING); + starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, scheduledState); if (starting) { - desiredState = ScheduledState.RUNNING; + desiredState = desiredSate; } } else if (currentState == ScheduledState.STOPPING && !failIfStopping) { - desiredState = ScheduledState.RUNNING; + desiredState = desiredSate; return; } else { starting = false; @@ -1385,7 +1411,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } else { final String procName = processorRef.get().toString(); LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState); - procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[] {procName, currentState}); + procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[]{procName, currentState}); } } @@ -1474,7 +1500,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void verifyCanTerminate() { - if (getScheduledState() != ScheduledState.STOPPED) { + if (getScheduledState() != ScheduledState.STOPPED && getScheduledState() != ScheduledState.RUN_ONCE) { throw new IllegalStateException("Processor is not stopped"); } } @@ -1529,7 +1555,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable deactivateThread(); } - if (desiredState == ScheduledState.RUNNING && scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { + if ( + (desiredState == ScheduledState.RUNNING && scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) + || (desiredState == ScheduledState.RUN_ONCE && scheduledState.compareAndSet(ScheduledState.RUN_ONCE, ScheduledState.RUN_ONCE)) + ) { LOG.debug("Successfully completed the @OnScheduled methods of {}; will now start triggering processor to run", processor); schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle } else { @@ -1576,7 +1605,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } // make sure we only continue retry loop if STOP action wasn't initiated - if (scheduledState.get() != ScheduledState.STOPPING) { + if (scheduledState.get() != ScheduledState.STOPPING && scheduledState.get() != ScheduledState.RUN_ONCE) { // re-initiate the entire process final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContextFactory, schedulingAgentCallback); taskScheduler.schedule(initiateStartTask, administrativeYieldMillis, TimeUnit.MILLISECONDS); @@ -1605,7 +1634,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return; } - monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get()); + monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get()); } }; @@ -1648,7 +1677,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable desiredState = ScheduledState.STOPPED; final CompletableFuture future = new CompletableFuture<>(); - if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once + // will ensure that the Processor represented by this node can only be stopped once + if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING) || this.scheduledState.compareAndSet(ScheduledState.RUN_ONCE, ScheduledState.STOPPING)) { scheduleState.incrementActiveThreadCount(null); // will continue to monitor active threads, invoking OnStopped once there are no diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 294faa5aab..31e2084b2d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -159,6 +159,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -1469,6 +1470,31 @@ public final class StandardProcessGroup implements ProcessGroup { } } + @Override + public Future runProcessorOnce(ProcessorNode processor, Callable> stopCallback) { + readLock.lock(); + try { + if (getProcessor(processor.getIdentifier()) == null) { + throw new IllegalStateException("Processor is not a member of this Process Group"); + } + + final ScheduledState state = processor.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("Processor is disabled"); + } else if (state == ScheduledState.RUNNING) { + throw new IllegalStateException("Processor is already running"); + } + processor.reloadAdditionalResourcesIfNecessary(); + + return scheduler.runProcessorOnce(processor, stopCallback); + } catch (Exception e) { + processor.getLogger().error("Error while running processor {} once.", new Object[]{processor}, e); + return stopProcessor(processor); + } finally { + readLock.unlock(); + } + } + @Override public void startInputPort(final Port port) { readLock.lock(); @@ -1557,7 +1583,7 @@ public final class StandardProcessGroup implements ProcessGroup { } final ScheduledState state = processor.getScheduledState(); - if (state != ScheduledState.STOPPED) { + if (state != ScheduledState.STOPPED && state != ScheduledState.RUN_ONCE) { throw new IllegalStateException("Cannot terminate processor with ID " + processor.getIdentifier() + " because it is not stopped"); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java index c64f8b2192..c0923b5b6a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -16,6 +16,13 @@ */ package org.apache.nifi.controller; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; @@ -27,12 +34,6 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.TerminatedTaskException; import org.apache.nifi.scheduling.SchedulingStrategy; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; - public interface ProcessScheduler { /** @@ -59,6 +60,18 @@ public interface ProcessScheduler { */ Future startProcessor(ProcessorNode procNode, boolean failIfStopping); + /** + * Starts scheduling the given processor to run once, after invoking all methods + * on the underlying {@link org.apache.nifi.processor.Processor FlowFileProcessor} that + * are annotated with the {@link org.apache.nifi.annotation.lifecycle.OnScheduled} annotation. If the Processor + * is already scheduled to run, does nothing. + * + * @param procNode to start + * @param stopCallback The callback that is responsible to handle the stopping of the processor after it has run once. + * @throws IllegalStateException if the Processor is disabled + */ + Future runProcessorOnce(ProcessorNode procNode, Callable> stopCallback); + /** * Stops scheduling the given processor to run and invokes all methods on * the underlying {@link org.apache.nifi.processor.Processor FlowFileProcessor} that diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 77308cbbc8..38a5120c09 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -199,6 +199,30 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con public abstract void start(ScheduledExecutorService scheduler, long administrativeYieldMillis, long timeoutMillis, Supplier processContextFactory, SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping); + /** + * Will run the {@link Processor} represented by this + * {@link ProcessorNode} once. This typically means invoking its + * operation that is annotated with @OnScheduled and then executing a + * callback provided by the {@link ProcessScheduler} to which typically + * initiates + * {@link Processor#onTrigger(ProcessContext, org.apache.nifi.processor.ProcessSessionFactory)} + * cycle and schedules the stopping of the processor right away. + * @param scheduler + * implementation of {@link ScheduledExecutorService} used to + * initiate processor start task + * @param administrativeYieldMillis + * the amount of milliseconds to wait for administrative yield + * @param timeoutMillis the number of milliseconds to wait after triggering the Processor's @OnScheduled methods before timing out and considering +* the startup a failure. This will result in the thread being interrupted and trying again. + * @param processContextFactory +* a factory for creating instances of {@link ProcessContext} + * @param schedulingAgentCallback +* the callback provided by the {@link ProcessScheduler} to +* execute upon successful start of the Processor + */ + public abstract void runOnce(ScheduledExecutorService scheduler, long administrativeYieldMillis, long timeoutMillis, Supplier processContextFactory, + SchedulingAgentCallback schedulingAgentCallback); + /** * Will stop the {@link Processor} represented by this {@link ProcessorNode}. * Stopping processor typically means invoking its operation that is diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/SchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/SchedulingAgent.java index 8dcdaa5a9f..8a0c01e758 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/SchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/SchedulingAgent.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.controller.scheduling; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.nifi.connectable.Connectable; @@ -25,6 +27,8 @@ public interface SchedulingAgent { void schedule(Connectable connectable, LifecycleState scheduleState); + void scheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable> stopCallback); + void unschedule(Connectable connectable, LifecycleState scheduleState); void onEvent(Connectable connectable); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index dc37634f57..519afd00d0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -47,6 +47,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.function.Predicate; @@ -205,6 +206,16 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi */ Future startProcessor(ProcessorNode processor, boolean failIfStopping); + /** + * Runs the given Processor once and the stops it by calling the provided callback. + * + * @param processor the processor to start + * @param stopCallback the callback responsible for stopping the processor + * @throws IllegalStateException if the processor is not valid, or is + * already running + */ + Future runProcessorOnce(ProcessorNode processor, Callable> stopCallback); + /** * Starts the given Input Port * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 7bb5052696..f4caec158b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -916,9 +916,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier(), false); break; case STOPPED: + case RUN_ONCE: if (procState == ScheduledState.DISABLED) { procNode.getProcessGroup().enableProcessor(procNode); - } else if (procState == ScheduledState.RUNNING) { + } else if (procState == ScheduledState.RUNNING || procState == ScheduledState.RUN_ONCE) { controller.stopProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier()); } break; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java index d6380ec71c..3e7461dbaa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java @@ -20,6 +20,9 @@ import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.engine.FlowEngine; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + /** * Base implementation of the {@link SchedulingAgent} which encapsulates the * updates to the {@link LifecycleState} based on invoked operation and then @@ -46,6 +49,12 @@ abstract class AbstractSchedulingAgent implements SchedulingAgent { this.doSchedule(connectable, scheduleState); } + @Override + public void scheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable> stopCallback) { + scheduleState.setScheduled(true); + this.doScheduleOnce(connectable, scheduleState, stopCallback); + } + @Override public void unschedule(Connectable connectable, LifecycleState scheduleState) { scheduleState.setScheduled(false); @@ -75,6 +84,18 @@ abstract class AbstractSchedulingAgent implements SchedulingAgent { */ protected abstract void doSchedule(Connectable connectable, LifecycleState scheduleState); + /** + * Schedules the provided {@link Connectable} to run once and then calls the provided stopCallback to stop it. + * Its {@link LifecycleState} will be set to true + * + * @param connectable + * the instance of {@link Connectable} + * @param scheduleState + * the instance of {@link LifecycleState} + * @param stopCallback the callback responsible for stopping connectable after it ran once + */ + protected abstract void doScheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable> stopCallback); + /** * Unschedules the provided {@link Connectable}. Its {@link LifecycleState} * will be set to false diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java new file mode 100644 index 0000000000..8998fbdf7e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.tasks.ConnectableTask; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractTimeBasedSchedulingAgent extends AbstractSchedulingAgent { + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + + protected final FlowController flowController; + protected final RepositoryContextFactory contextFactory; + protected final StringEncryptor encryptor; + + protected volatile String adminYieldDuration = "1 sec"; + + public AbstractTimeBasedSchedulingAgent( + final FlowEngine flowEngine, + final FlowController flowController, + final RepositoryContextFactory contextFactory, + final StringEncryptor encryptor + ) { + super(flowEngine); + this.flowController = flowController; + this.contextFactory = contextFactory; + this.encryptor = encryptor; + } + + @Override + public void doScheduleOnce(final Connectable connectable, final LifecycleState scheduleState, Callable> stopCallback) { + final List> futures = new ArrayList<>(); + final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState, encryptor); + + final Runnable trigger = () -> { + connectableTask.invoke(); + try { + stopCallback.call(); + } catch (Exception e) { + String errorMessage = "Error while stopping " + connectable + " after running once."; + logger.error(errorMessage, e); + throw new ProcessException(errorMessage, e); + } + }; + + final ScheduledFuture future = flowEngine.schedule(trigger, 1, TimeUnit.NANOSECONDS); + futures.add(future); + + scheduleState.setFutures(futures); + } + + @Override + public void setAdministrativeYieldDuration(final String yieldDuration) { + this.adminYieldDuration = yieldDuration; + } + + @Override + public String getAdministrativeYieldDuration() { + return adminYieldDuration; + } + + @Override + public long getAdministrativeYieldDuration(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit); + } + + @Override + public void incrementMaxThreadCount(int toAdd) { + final int corePoolSize = flowEngine.getCorePoolSize(); + if (toAdd < 0 && corePoolSize + toAdd < 1) { + throw new IllegalStateException("Cannot remove " + (-toAdd) + " threads from pool because there are only " + corePoolSize + " threads in the pool"); + } + + flowEngine.setCorePoolSize(corePoolSize + toAdd); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 5e38337c98..351e014e88 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -51,8 +51,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -125,6 +127,11 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { scheduleStates.put(connectable, scheduleState); } + @Override + protected void doScheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable> stopCallback) { + throw new UnsupportedOperationException(); + } + @Override public void doUnschedule(final Connectable connectable, final LifecycleState scheduleState) { workerQueue.suspendWork(connectable); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java index 0f73c0e3ad..9bc987d30b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java @@ -24,10 +24,7 @@ import org.apache.nifi.controller.tasks.ReportingTaskWrapper; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.FormatUtils; import org.quartz.CronExpression; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; @@ -37,22 +34,11 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -public class QuartzSchedulingAgent extends AbstractSchedulingAgent { - - private final Logger logger = LoggerFactory.getLogger(QuartzSchedulingAgent.class); - - private final FlowController flowController; - private final RepositoryContextFactory contextFactory; - private final StringEncryptor encryptor; - - private volatile String adminYieldDuration = "1 sec"; +public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent { private final Map> canceledTriggers = new HashMap<>(); public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory, final StringEncryptor enryptor) { - super(flowEngine); - this.flowController = flowController; - this.contextFactory = contextFactory; - this.encryptor = enryptor; + super(flowEngine, flowController, contextFactory, enryptor); } @Override @@ -208,31 +194,6 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { public void setMaxThreadCount(final int maxThreads) { } - @Override - public void setAdministrativeYieldDuration(final String yieldDuration) { - this.adminYieldDuration = yieldDuration; - } - - @Override - public String getAdministrativeYieldDuration() { - return adminYieldDuration; - } - - @Override - public long getAdministrativeYieldDuration(final TimeUnit timeUnit) { - return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit); - } - - @Override - public void incrementMaxThreadCount(int toAdd) { - final int corePoolSize = flowEngine.getCorePoolSize(); - if (toAdd < 0 && corePoolSize + toAdd < 1) { - throw new IllegalStateException("Cannot remove " + (-toAdd) + " threads from pool because there are only " + corePoolSize + " threads in the pool"); - } - - flowEngine.setCorePoolSize(corePoolSize + toAdd); - } - private static Date getNextSchedule(final Date currentSchedule, final CronExpression cronExpression) { // Since the clock has not a millisecond precision, we have to check that we // schedule the next time after the time this was supposed to run, otherwise diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index ae40e5bd97..28e00f22b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -352,6 +352,47 @@ public final class StandardProcessScheduler implements ProcessScheduler { return future; } + /** + * Runs the given {@link Processor} once by invoking its + * {@link ProcessorNode#runOnce(ScheduledExecutorService, long, long, Supplier, SchedulingAgentCallback)} + * method. + * + * @see ProcessorNode#runOnce(ScheduledExecutorService, long, long, Supplier, SchedulingAgentCallback) + */ + @Override + public Future runProcessorOnce(ProcessorNode procNode, final Callable> stopCallback) { + final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true); + + final Supplier processContextFactory = () -> new StandardProcessContext(procNode, getControllerServiceProvider(), + this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController); + + final CompletableFuture future = new CompletableFuture<>(); + final SchedulingAgentCallback callback = new SchedulingAgentCallback() { + @Override + public void trigger() { + lifecycleState.clearTerminationFlag(); + getSchedulingAgent(procNode).scheduleOnce(procNode, lifecycleState, stopCallback); + future.complete(null); + } + + @Override + public Future scheduleTask(final Callable task) { + lifecycleState.incrementActiveThreadCount(null); + return componentLifeCycleThreadPool.submit(task); + } + + @Override + public void onTaskComplete() { + lifecycleState.decrementActiveThreadCount(null); + } + }; + + LOG.info("Running once {}", procNode); + procNode.runOnce(componentMonitoringThreadPool, administrativeYieldMillis, processorStartTimeoutMillis, processContextFactory, callback); + + return future; + } + /** * Stops the given {@link Processor} by invoking its * {@link ProcessorNode#stop(ProcessScheduler, ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState)} @@ -372,7 +413,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public synchronized void terminateProcessor(final ProcessorNode procNode) { - if (procNode.getScheduledState() != ScheduledState.STOPPED) { + if (procNode.getScheduledState() != ScheduledState.STOPPED && procNode.getScheduledState() != ScheduledState.RUN_ONCE) { throw new IllegalStateException("Cannot terminate " + procNode + " because it is not currently stopped"); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index f7e496327d..f4df9e3c5a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -26,8 +26,6 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -35,23 +33,12 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { - - private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class); +public class TimerDrivenSchedulingAgent extends AbstractTimeBasedSchedulingAgent { private final long noWorkYieldNanos; - private final FlowController flowController; - private final RepositoryContextFactory contextFactory; - private final StringEncryptor encryptor; - - private volatile String adminYieldDuration = "1 sec"; - public TimerDrivenSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory, final StringEncryptor encryptor, final NiFiProperties nifiProperties) { - super(flowEngine); - this.flowController = flowController; - this.contextFactory = contextFactory; - this.encryptor = encryptor; + super(flowEngine, flowController, contextFactory, encryptor); final String boredYieldDuration = nifiProperties.getBoredYieldDuration(); try { @@ -106,7 +93,6 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { logger.info("Scheduled {} to run with {} threads", connectable, connectable.getMaxConcurrentTasks()); } - private Runnable createTrigger(final ConnectableTask connectableTask, final LifecycleState scheduleState, final AtomicReference> futureRef) { final Connectable connectable = connectableTask.getConnectable(); final Runnable yieldDetectionRunnable = new Runnable() { @@ -196,21 +182,6 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { logger.info("Stopped scheduling {} to run", taskNode.getReportingTask()); } - @Override - public void setAdministrativeYieldDuration(final String yieldDuration) { - this.adminYieldDuration = yieldDuration; - } - - @Override - public String getAdministrativeYieldDuration() { - return adminYieldDuration; - } - - @Override - public long getAdministrativeYieldDuration(final TimeUnit timeUnit) { - return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit); - } - @Override public void onEvent(final Connectable connectable) { } @@ -218,14 +189,4 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { @Override public void setMaxThreadCount(final int maxThreads) { } - - @Override - public void incrementMaxThreadCount(int toAdd) { - final int corePoolSize = flowEngine.getCorePoolSize(); - if (toAdd < 0 && corePoolSize + toAdd < 1) { - throw new IllegalStateException("Cannot remove " + (-toAdd) + " threads from pool because there are only " + corePoolSize + " threads in the pool"); - } - - flowEngine.setCorePoolSize(corePoolSize + toAdd); - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java index 8fe3a59372..1f1bea8ed6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java @@ -208,7 +208,7 @@ public class ConnectableTask { final String originalThreadName = Thread.currentThread().getName(); try { try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), connectable.getRunnableComponent().getClass(), connectable.getIdentifier())) { - boolean shouldRun = connectable.getScheduledState() == ScheduledState.RUNNING; + boolean shouldRun = connectable.getScheduledState() == ScheduledState.RUNNING || connectable.getScheduledState() == ScheduledState.RUN_ONCE; while (shouldRun) { invocationCount++; connectable.onTrigger(processContext, activeSessionFactory); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index b9ee11c950..c27c78a78f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -59,7 +59,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.function.Predicate; public class MockProcessGroup implements ProcessGroup { @@ -174,6 +176,11 @@ public class MockProcessGroup implements ProcessGroup { return CompletableFuture.completedFuture(null); } + @Override + public Future runProcessorOnce(ProcessorNode processor, Callable> stopCallback) { + return CompletableFuture.completedFuture(null); + } + @Override public void startInputPort(final Port port) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index 900d1e2208..2f743891b7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -467,6 +467,9 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { case DISABLED: parentGroup.disableProcessor(processor); break; + case RUN_ONCE: + parentGroup.runProcessorOnce(processor, () -> parentGroup.stopProcessor(processor)); + break; } } catch (IllegalStateException | ComponentLifeCycleException ise) { throw new NiFiCoreException(ise.getMessage(), ise); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js index d09ac94f24..7cc655367e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js @@ -702,6 +702,46 @@ } }, + /** + * Runs a processor once. + * + * @argument {selection} selection The selection + */ + runOnce: function (selection) { + var componentsToRunOnce = selection.filter(function (d) { + return nfCanvasUtils.isRunnable(d3.select(this)); + }); + + // ensure there are startable components selected + if (!componentsToRunOnce.empty()) { + var requests = []; + + // start each selected component + componentsToRunOnce.each(function (d) { + var selected = d3.select(this); + + // prepare the request + var uri, entity; + uri = d.uri + '/run-status'; + entity = { + 'revision': nfClient.getRevision(d), + 'state': 'RUN_ONCE' + }; + + requests.push(updateResource(uri, entity).done(function (response) { + nfCanvasUtils.getComponentByType(d.type).set(response); + })); + }); + + // inform Angular app once the updates have completed + if (requests.length > 0) { + $.when.apply(window, requests).always(function () { + nfNgBridge.digest(); + }); + } + } + }, + /** * Stops the components in the specified selection. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js index 30412ea38b..986672dd2c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js @@ -429,6 +429,20 @@ return nfCanvasUtils.isProcessGroup(selection); }; + /** + * Determines whether the current selection is a processor. + * + * @param {selection} selection + */ + var isRunnableProcessor = function (selection) { + // ensure the correct number of components are selected + if (selection.size() !== 1) { + return false; + } + + return isRunnable(selection) && nfCanvasUtils.isProcessor(selection); + }; + /** * Returns whether the process group supports downloading the current flow. * @@ -804,6 +818,7 @@ {separator: true}, {id: 'start-menu-item', condition: isRunnable, menuItem: {clazz: 'fa fa-play', text: 'Start', action: 'start'}}, {id: 'stop-menu-item', condition: isStoppable, menuItem: {clazz: 'fa fa-stop', text: 'Stop', action: 'stop'}}, + {id: 'run-once-menu-item', condition: isRunnableProcessor, menuItem: {clazz: 'fa fa-caret-right', text: 'Run Once', action: 'runOnce'}}, {id: 'terminate-menu-item', condition: canTerminate, menuItem: {clazz: 'fa fa-hourglass-end', text: 'Terminate', action: 'terminate'}}, {id: 'enable-menu-item', condition: canEnable, menuItem: {clazz: 'fa fa-flash', text: 'Enable', action: 'enable'}}, {id: 'disable-menu-item', condition: canDisable, menuItem: {clazz: 'icon icon-enable-false', text: 'Disable', action: 'disable'}}, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java index ffd3287234..cd2afb2b66 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java @@ -139,6 +139,11 @@ public class StatelessProcessScheduler implements ProcessScheduler { } + @Override + public Future runProcessorOnce(ProcessorNode procNode, final Callable> stopCallback) { + throw new UnsupportedOperationException(); + } + @Override public Future stopProcessor(final ProcessorNode procNode) { logger.info("Stopping {}", procNode); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessSchedulingAgent.java index 6e891b6ddc..c77204798f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessSchedulingAgent.java @@ -28,6 +28,8 @@ import org.apache.nifi.processor.SimpleProcessLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class StatelessSchedulingAgent implements SchedulingAgent { @@ -42,6 +44,10 @@ public class StatelessSchedulingAgent implements SchedulingAgent { public void schedule(final Connectable connectable, final LifecycleState scheduleState) { } + @Override + public void scheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable> stopCallback) { + } + @Override public void unschedule(final Connectable connectable, final LifecycleState scheduleState) { } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java new file mode 100644 index 0000000000..3d2507f928 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.processor; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class RunOnceIT extends NiFiSystemIT { + + @Test + public void testRunOnce() throws NiFiClientException, IOException, InterruptedException { + // GIVEN + ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + getClientUtil().updateProcessorSchedulingPeriod(generate, "1 sec"); + + ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + ConnectionEntity generateToTerminate = getClientUtil().createConnection(generate, terminate, "success"); + + // WHEN + getNifiClient().getProcessorClient().runProcessorOnce(generate); + + // THEN + waitForQueueCount(generateToTerminate.getId(), 1); + + ProcessorEntity actualGenerate = getNifiClient().getProcessorClient().getProcessor(generate.getId()); + String actualRunStatus = actualGenerate.getStatus().getRunStatus(); + + assertEquals("Stopped", actualRunStatus); + assertEquals(1, getConnectionQueueSize(generateToTerminate.getId())); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java index bcce57c409..ea89d06393 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java @@ -31,6 +31,10 @@ public interface ProcessorClient { ProcessorEntity startProcessor(ProcessorEntity processorEntity) throws NiFiClientException, IOException; + ProcessorEntity runProcessorOnce(String processorId, String clientId, long version) throws NiFiClientException, IOException; + + ProcessorEntity runProcessorOnce(ProcessorEntity processorEntity) throws NiFiClientException, IOException; + ProcessorEntity stopProcessor(String processorId, String clientId, long version) throws NiFiClientException, IOException; ProcessorEntity stopProcessor(ProcessorEntity processorEntity) throws NiFiClientException, IOException; diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java index 0acd1e875d..467ede1bd3 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java @@ -103,6 +103,16 @@ public class JerseyProcessorClient extends AbstractJerseyClient implements Proce return startProcessor(processorEntity.getId(), processorEntity.getRevision().getClientId(), processorEntity.getRevision().getVersion()); } + @Override + public ProcessorEntity runProcessorOnce(final String processorId, final String clientId, final long version) throws NiFiClientException, IOException { + return updateProcessorState(processorId, "RUN_ONCE", clientId, version); + } + + @Override + public ProcessorEntity runProcessorOnce(ProcessorEntity processorEntity) throws NiFiClientException, IOException { + return runProcessorOnce(processorEntity.getId(), processorEntity.getRevision().getClientId(), processorEntity.getRevision().getVersion()); + } + @Override public ProcessorEntity stopProcessor(final String processorId, final String clientId, final long version) throws NiFiClientException, IOException { return updateProcessorState(processorId, "STOPPED", clientId, version);