NIFI-8188 - Add 'Run Once' for processors in context menu.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Tamas Palfy 2021-02-02 14:22:11 +01:00 committed by Mark Payne
parent 6801704846
commit aa726040c5
26 changed files with 452 additions and 103 deletions

View File

@ -37,5 +37,7 @@ public enum ScheduledState {
STARTING,
STOPPING;
STOPPING,
RUN_ONCE;
}

View File

@ -300,6 +300,7 @@ While the options available from the context menu vary, the following options ar
NOTE: For Processors, Ports, Remote Process Groups, Connections and Labels, it is possible to open the configuration dialog by double-clicking on the desired component.
- *Start* or *Stop*: This option allows the user to start or stop a Processor; the option will be either Start or Stop, depending on the current state of the Processor.
- *Run Once*: This option allows the user to run a selected Processor exactly once. If the Processor is prevented from executing (e.g. there are no incoming flow files or the outgoing connection has backpressure applied) the Processor won't get triggered. *Execution* settings apply - i.e. *Primary Node* and *All Nodes* setting will result in running the Processor only once on the Primary Node or one time on each of the nodes, respectively. Works only with *Timer Driven* and *CRON driven* Scheduling Strategy.
- *Enable* or *Disable*: This option allows the user to enable or disable a Processor; the option will be either Enable or Disable, depending on the current state of the Processor.
- *View data provenance*: This option displays the NiFi Data Provenance table, with information about data provenance events for the FlowFiles routed through that Processor (see <<data_provenance>>).
- *View status history*: This option opens a graphical representation of the Processor's statistical information over time.

View File

@ -26,7 +26,7 @@ import javax.xml.bind.annotation.XmlType;
@XmlType(name = "processorRunStatus")
public class ProcessorRunStatusEntity extends ComponentRunStatusEntity {
private static String[] SUPPORTED_STATE = {"RUNNING", "STOPPED", "DISABLED"};
private static String[] SUPPORTED_STATE = {"RUNNING", "RUN_ONCE", "STOPPED", "DISABLED"};
@Override
protected String[] getSupportedState() {

View File

@ -1358,6 +1358,32 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final Supplier<ProcessContext> processContextFactory,
final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) {
run(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback, failIfStopping, ScheduledState.RUNNING, ScheduledState.STARTING);
}
/**
* Similar to {@link #start(ScheduledExecutorService, long, long, Supplier, SchedulingAgentCallback, boolean)}, except for the following:
* <ul>
* <li>
* Once the {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method has been invoked successfully, the processor is scehduled to be stopped immediately.
* All appropriate lifecycle methods will be executed as well.
* </li>
* <li>
* The processor's desired state is going to be set to STOPPED right away. This usually doesn't prevent the processor to run once, unless NiFi is restarted before it can finish.
* In that case the processor will stay STOPPED after the restart.
* </li>
* </ul>
*/
@Override
public void runOnce(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final Supplier<ProcessContext> processContextFactory,
final SchedulingAgentCallback schedulingAgentCallback) {
run(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback, true, ScheduledState.RUN_ONCE, ScheduledState.RUN_ONCE);
}
private void run(ScheduledExecutorService taskScheduler, long administrativeYieldMillis, long timeoutMillis, Supplier<ProcessContext> processContextFactory,
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping, ScheduledState desiredSate, ScheduledState scheduledState) {
final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
LOG.info("Starting {}", this);
@ -1368,12 +1394,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
currentState = this.scheduledState.get();
if (currentState == ScheduledState.STOPPED) {
starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING);
starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, scheduledState);
if (starting) {
desiredState = ScheduledState.RUNNING;
desiredState = desiredSate;
}
} else if (currentState == ScheduledState.STOPPING && !failIfStopping) {
desiredState = ScheduledState.RUNNING;
desiredState = desiredSate;
return;
} else {
starting = false;
@ -1385,7 +1411,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} else {
final String procName = processorRef.get().toString();
LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState);
procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[] {procName, currentState});
procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[]{procName, currentState});
}
}
@ -1474,7 +1500,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public void verifyCanTerminate() {
if (getScheduledState() != ScheduledState.STOPPED) {
if (getScheduledState() != ScheduledState.STOPPED && getScheduledState() != ScheduledState.RUN_ONCE) {
throw new IllegalStateException("Processor is not stopped");
}
}
@ -1529,7 +1555,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
deactivateThread();
}
if (desiredState == ScheduledState.RUNNING && scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
if (
(desiredState == ScheduledState.RUNNING && scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING))
|| (desiredState == ScheduledState.RUN_ONCE && scheduledState.compareAndSet(ScheduledState.RUN_ONCE, ScheduledState.RUN_ONCE))
) {
LOG.debug("Successfully completed the @OnScheduled methods of {}; will now start triggering processor to run", processor);
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else {
@ -1576,7 +1605,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
// make sure we only continue retry loop if STOP action wasn't initiated
if (scheduledState.get() != ScheduledState.STOPPING) {
if (scheduledState.get() != ScheduledState.STOPPING && scheduledState.get() != ScheduledState.RUN_ONCE) {
// re-initiate the entire process
final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContextFactory, schedulingAgentCallback);
taskScheduler.schedule(initiateStartTask, administrativeYieldMillis, TimeUnit.MILLISECONDS);
@ -1605,7 +1634,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
return;
}
monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get());
monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get());
}
};
@ -1648,7 +1677,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
desiredState = ScheduledState.STOPPED;
final CompletableFuture<Void> future = new CompletableFuture<>();
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once
// will ensure that the Processor represented by this node can only be stopped once
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING) || this.scheduledState.compareAndSet(ScheduledState.RUN_ONCE, ScheduledState.STOPPING)) {
scheduleState.incrementActiveThreadCount(null);
// will continue to monitor active threads, invoking OnStopped once there are no

View File

@ -159,6 +159,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -1469,6 +1470,31 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
@Override
public Future<Void> runProcessorOnce(ProcessorNode processor, Callable<Future<Void>> stopCallback) {
readLock.lock();
try {
if (getProcessor(processor.getIdentifier()) == null) {
throw new IllegalStateException("Processor is not a member of this Process Group");
}
final ScheduledState state = processor.getScheduledState();
if (state == ScheduledState.DISABLED) {
throw new IllegalStateException("Processor is disabled");
} else if (state == ScheduledState.RUNNING) {
throw new IllegalStateException("Processor is already running");
}
processor.reloadAdditionalResourcesIfNecessary();
return scheduler.runProcessorOnce(processor, stopCallback);
} catch (Exception e) {
processor.getLogger().error("Error while running processor {} once.", new Object[]{processor}, e);
return stopProcessor(processor);
} finally {
readLock.unlock();
}
}
@Override
public void startInputPort(final Port port) {
readLock.lock();
@ -1557,7 +1583,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final ScheduledState state = processor.getScheduledState();
if (state != ScheduledState.STOPPED) {
if (state != ScheduledState.STOPPED && state != ScheduledState.RUN_ONCE) {
throw new IllegalStateException("Cannot terminate processor with ID " + processor.getIdentifier() + " because it is not stopped");
}

View File

@ -16,6 +16,13 @@
*/
package org.apache.nifi.controller;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
@ -27,12 +34,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.scheduling.SchedulingStrategy;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public interface ProcessScheduler {
/**
@ -59,6 +60,18 @@ public interface ProcessScheduler {
*/
Future<Void> startProcessor(ProcessorNode procNode, boolean failIfStopping);
/**
* Starts scheduling the given processor to run once, after invoking all methods
* on the underlying {@link org.apache.nifi.processor.Processor FlowFileProcessor} that
* are annotated with the {@link org.apache.nifi.annotation.lifecycle.OnScheduled} annotation. If the Processor
* is already scheduled to run, does nothing.
*
* @param procNode to start
* @param stopCallback The callback that is responsible to handle the stopping of the processor after it has run once.
* @throws IllegalStateException if the Processor is disabled
*/
Future<Void> runProcessorOnce(ProcessorNode procNode, Callable<Future<Void>> stopCallback);
/**
* Stops scheduling the given processor to run and invokes all methods on
* the underlying {@link org.apache.nifi.processor.Processor FlowFileProcessor} that

View File

@ -199,6 +199,30 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
public abstract void start(ScheduledExecutorService scheduler, long administrativeYieldMillis, long timeoutMillis, Supplier<ProcessContext> processContextFactory,
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping);
/**
* Will run the {@link Processor} represented by this
* {@link ProcessorNode} once. This typically means invoking its
* operation that is annotated with @OnScheduled and then executing a
* callback provided by the {@link ProcessScheduler} to which typically
* initiates
* {@link Processor#onTrigger(ProcessContext, org.apache.nifi.processor.ProcessSessionFactory)}
* cycle and schedules the stopping of the processor right away.
* @param scheduler
* implementation of {@link ScheduledExecutorService} used to
* initiate processor <i>start</i> task
* @param administrativeYieldMillis
* the amount of milliseconds to wait for administrative yield
* @param timeoutMillis the number of milliseconds to wait after triggering the Processor's @OnScheduled methods before timing out and considering
* the startup a failure. This will result in the thread being interrupted and trying again.
* @param processContextFactory
* a factory for creating instances of {@link ProcessContext}
* @param schedulingAgentCallback
* the callback provided by the {@link ProcessScheduler} to
* execute upon successful start of the Processor
*/
public abstract void runOnce(ScheduledExecutorService scheduler, long administrativeYieldMillis, long timeoutMillis, Supplier<ProcessContext> processContextFactory,
SchedulingAgentCallback schedulingAgentCallback);
/**
* Will stop the {@link Processor} represented by this {@link ProcessorNode}.
* Stopping processor typically means invoking its operation that is

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.scheduling;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.connectable.Connectable;
@ -25,6 +27,8 @@ public interface SchedulingAgent {
void schedule(Connectable connectable, LifecycleState scheduleState);
void scheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback);
void unschedule(Connectable connectable, LifecycleState scheduleState);
void onEvent(Connectable connectable);

View File

@ -47,6 +47,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.function.Predicate;
@ -205,6 +206,16 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
*/
Future<Void> startProcessor(ProcessorNode processor, boolean failIfStopping);
/**
* Runs the given Processor once and the stops it by calling the provided callback.
*
* @param processor the processor to start
* @param stopCallback the callback responsible for stopping the processor
* @throws IllegalStateException if the processor is not valid, or is
* already running
*/
Future<Void> runProcessorOnce(ProcessorNode processor, Callable<Future<Void>> stopCallback);
/**
* Starts the given Input Port
*

View File

@ -916,9 +916,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier(), false);
break;
case STOPPED:
case RUN_ONCE:
if (procState == ScheduledState.DISABLED) {
procNode.getProcessGroup().enableProcessor(procNode);
} else if (procState == ScheduledState.RUNNING) {
} else if (procState == ScheduledState.RUNNING || procState == ScheduledState.RUN_ONCE) {
controller.stopProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier());
}
break;

View File

@ -20,6 +20,9 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.engine.FlowEngine;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/**
* Base implementation of the {@link SchedulingAgent} which encapsulates the
* updates to the {@link LifecycleState} based on invoked operation and then
@ -46,6 +49,12 @@ abstract class AbstractSchedulingAgent implements SchedulingAgent {
this.doSchedule(connectable, scheduleState);
}
@Override
public void scheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback) {
scheduleState.setScheduled(true);
this.doScheduleOnce(connectable, scheduleState, stopCallback);
}
@Override
public void unschedule(Connectable connectable, LifecycleState scheduleState) {
scheduleState.setScheduled(false);
@ -75,6 +84,18 @@ abstract class AbstractSchedulingAgent implements SchedulingAgent {
*/
protected abstract void doSchedule(Connectable connectable, LifecycleState scheduleState);
/**
* Schedules the provided {@link Connectable} to run once and then calls the provided stopCallback to stop it.
* Its {@link LifecycleState} will be set to <i>true</i>
*
* @param connectable
* the instance of {@link Connectable}
* @param scheduleState
* the instance of {@link LifecycleState}
* @param stopCallback the callback responsible for stopping connectable after it ran once
*/
protected abstract void doScheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback);
/**
* Unschedules the provided {@link Connectable}. Its {@link LifecycleState}
* will be set to <i>false</i>

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public abstract class AbstractTimeBasedSchedulingAgent extends AbstractSchedulingAgent {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected final FlowController flowController;
protected final RepositoryContextFactory contextFactory;
protected final StringEncryptor encryptor;
protected volatile String adminYieldDuration = "1 sec";
public AbstractTimeBasedSchedulingAgent(
final FlowEngine flowEngine,
final FlowController flowController,
final RepositoryContextFactory contextFactory,
final StringEncryptor encryptor
) {
super(flowEngine);
this.flowController = flowController;
this.contextFactory = contextFactory;
this.encryptor = encryptor;
}
@Override
public void doScheduleOnce(final Connectable connectable, final LifecycleState scheduleState, Callable<Future<Void>> stopCallback) {
final List<ScheduledFuture<?>> futures = new ArrayList<>();
final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState, encryptor);
final Runnable trigger = () -> {
connectableTask.invoke();
try {
stopCallback.call();
} catch (Exception e) {
String errorMessage = "Error while stopping " + connectable + " after running once.";
logger.error(errorMessage, e);
throw new ProcessException(errorMessage, e);
}
};
final ScheduledFuture<?> future = flowEngine.schedule(trigger, 1, TimeUnit.NANOSECONDS);
futures.add(future);
scheduleState.setFutures(futures);
}
@Override
public void setAdministrativeYieldDuration(final String yieldDuration) {
this.adminYieldDuration = yieldDuration;
}
@Override
public String getAdministrativeYieldDuration() {
return adminYieldDuration;
}
@Override
public long getAdministrativeYieldDuration(final TimeUnit timeUnit) {
return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit);
}
@Override
public void incrementMaxThreadCount(int toAdd) {
final int corePoolSize = flowEngine.getCorePoolSize();
if (toAdd < 0 && corePoolSize + toAdd < 1) {
throw new IllegalStateException("Cannot remove " + (-toAdd) + " threads from pool because there are only " + corePoolSize + " threads in the pool");
}
flowEngine.setCorePoolSize(corePoolSize + toAdd);
}
}

View File

@ -51,8 +51,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -125,6 +127,11 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
scheduleStates.put(connectable, scheduleState);
}
@Override
protected void doScheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback) {
throw new UnsupportedOperationException();
}
@Override
public void doUnschedule(final Connectable connectable, final LifecycleState scheduleState) {
workerQueue.suspendWork(connectable);

View File

@ -24,10 +24,7 @@ import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
@ -37,22 +34,11 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
private final Logger logger = LoggerFactory.getLogger(QuartzSchedulingAgent.class);
private final FlowController flowController;
private final RepositoryContextFactory contextFactory;
private final StringEncryptor encryptor;
private volatile String adminYieldDuration = "1 sec";
public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<>();
public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory, final StringEncryptor enryptor) {
super(flowEngine);
this.flowController = flowController;
this.contextFactory = contextFactory;
this.encryptor = enryptor;
super(flowEngine, flowController, contextFactory, enryptor);
}
@Override
@ -208,31 +194,6 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
public void setMaxThreadCount(final int maxThreads) {
}
@Override
public void setAdministrativeYieldDuration(final String yieldDuration) {
this.adminYieldDuration = yieldDuration;
}
@Override
public String getAdministrativeYieldDuration() {
return adminYieldDuration;
}
@Override
public long getAdministrativeYieldDuration(final TimeUnit timeUnit) {
return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit);
}
@Override
public void incrementMaxThreadCount(int toAdd) {
final int corePoolSize = flowEngine.getCorePoolSize();
if (toAdd < 0 && corePoolSize + toAdd < 1) {
throw new IllegalStateException("Cannot remove " + (-toAdd) + " threads from pool because there are only " + corePoolSize + " threads in the pool");
}
flowEngine.setCorePoolSize(corePoolSize + toAdd);
}
private static Date getNextSchedule(final Date currentSchedule, final CronExpression cronExpression) {
// Since the clock has not a millisecond precision, we have to check that we
// schedule the next time after the time this was supposed to run, otherwise

View File

@ -352,6 +352,47 @@ public final class StandardProcessScheduler implements ProcessScheduler {
return future;
}
/**
* Runs the given {@link Processor} once by invoking its
* {@link ProcessorNode#runOnce(ScheduledExecutorService, long, long, Supplier, SchedulingAgentCallback)}
* method.
*
* @see ProcessorNode#runOnce(ScheduledExecutorService, long, long, Supplier, SchedulingAgentCallback)
*/
@Override
public Future<Void> runProcessorOnce(ProcessorNode procNode, final Callable<Future<Void>> stopCallback) {
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);
final Supplier<ProcessContext> processContextFactory = () -> new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
final CompletableFuture<Void> future = new CompletableFuture<>();
final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
@Override
public void trigger() {
lifecycleState.clearTerminationFlag();
getSchedulingAgent(procNode).scheduleOnce(procNode, lifecycleState, stopCallback);
future.complete(null);
}
@Override
public Future<?> scheduleTask(final Callable<?> task) {
lifecycleState.incrementActiveThreadCount(null);
return componentLifeCycleThreadPool.submit(task);
}
@Override
public void onTaskComplete() {
lifecycleState.decrementActiveThreadCount(null);
}
};
LOG.info("Running once {}", procNode);
procNode.runOnce(componentMonitoringThreadPool, administrativeYieldMillis, processorStartTimeoutMillis, processContextFactory, callback);
return future;
}
/**
* Stops the given {@link Processor} by invoking its
* {@link ProcessorNode#stop(ProcessScheduler, ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState)}
@ -372,7 +413,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public synchronized void terminateProcessor(final ProcessorNode procNode) {
if (procNode.getScheduledState() != ScheduledState.STOPPED) {
if (procNode.getScheduledState() != ScheduledState.STOPPED && procNode.getScheduledState() != ScheduledState.RUN_ONCE) {
throw new IllegalStateException("Cannot terminate " + procNode + " because it is not currently stopped");
}

View File

@ -26,8 +26,6 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@ -35,23 +33,12 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class);
public class TimerDrivenSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
private final long noWorkYieldNanos;
private final FlowController flowController;
private final RepositoryContextFactory contextFactory;
private final StringEncryptor encryptor;
private volatile String adminYieldDuration = "1 sec";
public TimerDrivenSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory,
final StringEncryptor encryptor, final NiFiProperties nifiProperties) {
super(flowEngine);
this.flowController = flowController;
this.contextFactory = contextFactory;
this.encryptor = encryptor;
super(flowEngine, flowController, contextFactory, encryptor);
final String boredYieldDuration = nifiProperties.getBoredYieldDuration();
try {
@ -106,7 +93,6 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
logger.info("Scheduled {} to run with {} threads", connectable, connectable.getMaxConcurrentTasks());
}
private Runnable createTrigger(final ConnectableTask connectableTask, final LifecycleState scheduleState, final AtomicReference<ScheduledFuture<?>> futureRef) {
final Connectable connectable = connectableTask.getConnectable();
final Runnable yieldDetectionRunnable = new Runnable() {
@ -196,21 +182,6 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
logger.info("Stopped scheduling {} to run", taskNode.getReportingTask());
}
@Override
public void setAdministrativeYieldDuration(final String yieldDuration) {
this.adminYieldDuration = yieldDuration;
}
@Override
public String getAdministrativeYieldDuration() {
return adminYieldDuration;
}
@Override
public long getAdministrativeYieldDuration(final TimeUnit timeUnit) {
return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit);
}
@Override
public void onEvent(final Connectable connectable) {
}
@ -218,14 +189,4 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
@Override
public void setMaxThreadCount(final int maxThreads) {
}
@Override
public void incrementMaxThreadCount(int toAdd) {
final int corePoolSize = flowEngine.getCorePoolSize();
if (toAdd < 0 && corePoolSize + toAdd < 1) {
throw new IllegalStateException("Cannot remove " + (-toAdd) + " threads from pool because there are only " + corePoolSize + " threads in the pool");
}
flowEngine.setCorePoolSize(corePoolSize + toAdd);
}
}

View File

@ -208,7 +208,7 @@ public class ConnectableTask {
final String originalThreadName = Thread.currentThread().getName();
try {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), connectable.getRunnableComponent().getClass(), connectable.getIdentifier())) {
boolean shouldRun = connectable.getScheduledState() == ScheduledState.RUNNING;
boolean shouldRun = connectable.getScheduledState() == ScheduledState.RUNNING || connectable.getScheduledState() == ScheduledState.RUN_ONCE;
while (shouldRun) {
invocationCount++;
connectable.onTrigger(processContext, activeSessionFactory);

View File

@ -59,7 +59,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Predicate;
public class MockProcessGroup implements ProcessGroup {
@ -174,6 +176,11 @@ public class MockProcessGroup implements ProcessGroup {
return CompletableFuture.completedFuture(null);
}
@Override
public Future<Void> runProcessorOnce(ProcessorNode processor, Callable<Future<Void>> stopCallback) {
return CompletableFuture.completedFuture(null);
}
@Override
public void startInputPort(final Port port) {

View File

@ -467,6 +467,9 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
case DISABLED:
parentGroup.disableProcessor(processor);
break;
case RUN_ONCE:
parentGroup.runProcessorOnce(processor, () -> parentGroup.stopProcessor(processor));
break;
}
} catch (IllegalStateException | ComponentLifeCycleException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);

View File

@ -702,6 +702,46 @@
}
},
/**
* Runs a processor once.
*
* @argument {selection} selection The selection
*/
runOnce: function (selection) {
var componentsToRunOnce = selection.filter(function (d) {
return nfCanvasUtils.isRunnable(d3.select(this));
});
// ensure there are startable components selected
if (!componentsToRunOnce.empty()) {
var requests = [];
// start each selected component
componentsToRunOnce.each(function (d) {
var selected = d3.select(this);
// prepare the request
var uri, entity;
uri = d.uri + '/run-status';
entity = {
'revision': nfClient.getRevision(d),
'state': 'RUN_ONCE'
};
requests.push(updateResource(uri, entity).done(function (response) {
nfCanvasUtils.getComponentByType(d.type).set(response);
}));
});
// inform Angular app once the updates have completed
if (requests.length > 0) {
$.when.apply(window, requests).always(function () {
nfNgBridge.digest();
});
}
}
},
/**
* Stops the components in the specified selection.
*

View File

@ -429,6 +429,20 @@
return nfCanvasUtils.isProcessGroup(selection);
};
/**
* Determines whether the current selection is a processor.
*
* @param {selection} selection
*/
var isRunnableProcessor = function (selection) {
// ensure the correct number of components are selected
if (selection.size() !== 1) {
return false;
}
return isRunnable(selection) && nfCanvasUtils.isProcessor(selection);
};
/**
* Returns whether the process group supports downloading the current flow.
*
@ -804,6 +818,7 @@
{separator: true},
{id: 'start-menu-item', condition: isRunnable, menuItem: {clazz: 'fa fa-play', text: 'Start', action: 'start'}},
{id: 'stop-menu-item', condition: isStoppable, menuItem: {clazz: 'fa fa-stop', text: 'Stop', action: 'stop'}},
{id: 'run-once-menu-item', condition: isRunnableProcessor, menuItem: {clazz: 'fa fa-caret-right', text: 'Run Once', action: 'runOnce'}},
{id: 'terminate-menu-item', condition: canTerminate, menuItem: {clazz: 'fa fa-hourglass-end', text: 'Terminate', action: 'terminate'}},
{id: 'enable-menu-item', condition: canEnable, menuItem: {clazz: 'fa fa-flash', text: 'Enable', action: 'enable'}},
{id: 'disable-menu-item', condition: canDisable, menuItem: {clazz: 'icon icon-enable-false', text: 'Disable', action: 'disable'}},

View File

@ -139,6 +139,11 @@ public class StatelessProcessScheduler implements ProcessScheduler {
}
@Override
public Future<Void> runProcessorOnce(ProcessorNode procNode, final Callable<Future<Void>> stopCallback) {
throw new UnsupportedOperationException();
}
@Override
public Future<Void> stopProcessor(final ProcessorNode procNode) {
logger.info("Stopping {}", procNode);

View File

@ -28,6 +28,8 @@ import org.apache.nifi.processor.SimpleProcessLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class StatelessSchedulingAgent implements SchedulingAgent {
@ -42,6 +44,10 @@ public class StatelessSchedulingAgent implements SchedulingAgent {
public void schedule(final Connectable connectable, final LifecycleState scheduleState) {
}
@Override
public void scheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback) {
}
@Override
public void unschedule(final Connectable connectable, final LifecycleState scheduleState) {
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.processor;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class RunOnceIT extends NiFiSystemIT {
@Test
public void testRunOnce() throws NiFiClientException, IOException, InterruptedException {
// GIVEN
ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorSchedulingPeriod(generate, "1 sec");
ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
ConnectionEntity generateToTerminate = getClientUtil().createConnection(generate, terminate, "success");
// WHEN
getNifiClient().getProcessorClient().runProcessorOnce(generate);
// THEN
waitForQueueCount(generateToTerminate.getId(), 1);
ProcessorEntity actualGenerate = getNifiClient().getProcessorClient().getProcessor(generate.getId());
String actualRunStatus = actualGenerate.getStatus().getRunStatus();
assertEquals("Stopped", actualRunStatus);
assertEquals(1, getConnectionQueueSize(generateToTerminate.getId()));
}
}

View File

@ -31,6 +31,10 @@ public interface ProcessorClient {
ProcessorEntity startProcessor(ProcessorEntity processorEntity) throws NiFiClientException, IOException;
ProcessorEntity runProcessorOnce(String processorId, String clientId, long version) throws NiFiClientException, IOException;
ProcessorEntity runProcessorOnce(ProcessorEntity processorEntity) throws NiFiClientException, IOException;
ProcessorEntity stopProcessor(String processorId, String clientId, long version) throws NiFiClientException, IOException;
ProcessorEntity stopProcessor(ProcessorEntity processorEntity) throws NiFiClientException, IOException;

View File

@ -103,6 +103,16 @@ public class JerseyProcessorClient extends AbstractJerseyClient implements Proce
return startProcessor(processorEntity.getId(), processorEntity.getRevision().getClientId(), processorEntity.getRevision().getVersion());
}
@Override
public ProcessorEntity runProcessorOnce(final String processorId, final String clientId, final long version) throws NiFiClientException, IOException {
return updateProcessorState(processorId, "RUN_ONCE", clientId, version);
}
@Override
public ProcessorEntity runProcessorOnce(ProcessorEntity processorEntity) throws NiFiClientException, IOException {
return runProcessorOnce(processorEntity.getId(), processorEntity.getRevision().getClientId(), processorEntity.getRevision().getVersion());
}
@Override
public ProcessorEntity stopProcessor(final String processorId, final String clientId, final long version) throws NiFiClientException, IOException {
return updateProcessorState(processorId, "STOPPED", clientId, version);