mirror of https://github.com/apache/nifi.git
NIFI-2776: This closes #2315. When joining a cluster, if a processor is stopping but cluster indicates that processor should be running, cause processor to start when its last thread finishes
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
a774f1df69
commit
8e6649ba15
|
@ -40,9 +40,13 @@ public interface ProcessScheduler {
|
|||
* is already scheduled to run, does nothing.
|
||||
*
|
||||
* @param procNode to start
|
||||
* @param failIfStopping If <code>false</code>, and the Processor is in the 'STOPPING' state,
|
||||
* then the Processor will automatically restart itself as soon as its last thread finishes. If this
|
||||
* value is <code>true</code> or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method
|
||||
* will throw an {@link IllegalStateException}.
|
||||
* @throws IllegalStateException if the Processor is disabled
|
||||
*/
|
||||
Future<Void> startProcessor(ProcessorNode procNode);
|
||||
Future<Void> startProcessor(ProcessorNode procNode, boolean failIfStopping);
|
||||
|
||||
/**
|
||||
* Stops scheduling the given processor to run and invokes all methods on
|
||||
|
|
|
@ -160,34 +160,37 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
|
|||
* @param administrativeYieldMillis
|
||||
* the amount of milliseconds to wait for administrative yield
|
||||
* @param processContext
|
||||
* the instance of {@link ProcessContext} and
|
||||
* {@link ControllerServiceLookup}
|
||||
* the instance of {@link ProcessContext}
|
||||
* @param schedulingAgentCallback
|
||||
* the callback provided by the {@link ProcessScheduler} to
|
||||
* execute upon successful start of the Processor
|
||||
* @param failIfStopping If <code>false</code>, and the Processor is in the 'STOPPING' state,
|
||||
* then the Processor will automatically restart itself as soon as its last thread finishes. If this
|
||||
* value is <code>true</code> or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method
|
||||
* will throw an {@link IllegalStateException}.
|
||||
*/
|
||||
public abstract <T extends ProcessContext & ControllerServiceLookup> void start(ScheduledExecutorService scheduler,
|
||||
long administrativeYieldMillis, T processContext, SchedulingAgentCallback schedulingAgentCallback);
|
||||
public abstract void start(ScheduledExecutorService scheduler,
|
||||
long administrativeYieldMillis, ProcessContext processContext, SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping);
|
||||
|
||||
/**
|
||||
* Will stop the {@link Processor} represented by this {@link ProcessorNode}.
|
||||
* Stopping processor typically means invoking its operation that is
|
||||
* annotated with @OnUnschedule and then @OnStopped.
|
||||
*
|
||||
* @param scheduler
|
||||
* @param processScheduler the ProcessScheduler that can be used to re-schedule the processor if need be
|
||||
* @param executor
|
||||
* implementation of {@link ScheduledExecutorService} used to
|
||||
* initiate processor <i>stop</i> task
|
||||
* @param processContext
|
||||
* the instance of {@link ProcessContext} and
|
||||
* {@link ControllerServiceLookup}
|
||||
* the instance of {@link ProcessContext}
|
||||
* @param schedulingAgent
|
||||
* the SchedulingAgent that is responsible for managing the scheduling of the ProcessorNode
|
||||
* @param scheduleState
|
||||
* the ScheduleState that can be used to ensure that the running state (STOPPED, RUNNING, etc.)
|
||||
* as well as the active thread counts are kept in sync
|
||||
*/
|
||||
public abstract <T extends ProcessContext & ControllerServiceLookup> CompletableFuture<Void> stop(ScheduledExecutorService scheduler,
|
||||
T processContext, SchedulingAgent schedulingAgent, ScheduleState scheduleState);
|
||||
public abstract CompletableFuture<Void> stop(ProcessScheduler processScheduler, ScheduledExecutorService executor,
|
||||
ProcessContext processContext, SchedulingAgent schedulingAgent, ScheduleState scheduleState);
|
||||
|
||||
/**
|
||||
* Will set the state of the processor to STOPPED which essentially implies
|
||||
|
|
|
@ -20,9 +20,9 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.Future;
|
||||
|
||||
public interface SchedulingAgentCallback {
|
||||
void postMonitor();
|
||||
void onTaskComplete();
|
||||
|
||||
Future<?> invokeMonitoringTask(Callable<?> task);
|
||||
Future<?> scheduleTask(Callable<?> task);
|
||||
|
||||
void trigger();
|
||||
}
|
||||
|
|
|
@ -161,10 +161,14 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
|
|||
* Starts the given Processor
|
||||
*
|
||||
* @param processor the processor to start
|
||||
* @param failIfStopping If <code>false</code>, and the Processor is in the 'STOPPING' state,
|
||||
* then the Processor will automatically restart itself as soon as its last thread finishes. If this
|
||||
* value is <code>true</code> or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method
|
||||
* will throw an {@link IllegalStateException}.
|
||||
* @throws IllegalStateException if the processor is not valid, or is
|
||||
* already running
|
||||
* already running
|
||||
*/
|
||||
CompletableFuture<Void> startProcessor(ProcessorNode processor);
|
||||
CompletableFuture<Void> startProcessor(ProcessorNode processor, boolean failIfStopping);
|
||||
|
||||
/**
|
||||
* Starts the given Input Port
|
||||
|
|
|
@ -805,7 +805,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
|
||||
try {
|
||||
if (connectable instanceof ProcessorNode) {
|
||||
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
|
||||
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
|
||||
} else {
|
||||
startConnectable(connectable);
|
||||
}
|
||||
|
@ -2984,6 +2984,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
}
|
||||
|
||||
public void startProcessor(final String parentGroupId, final String processorId) {
|
||||
startProcessor(parentGroupId, processorId, true);
|
||||
}
|
||||
|
||||
public void startProcessor(final String parentGroupId, final String processorId, final boolean failIfStopping) {
|
||||
final ProcessGroup group = lookupGroup(parentGroupId);
|
||||
final ProcessorNode node = group.getProcessor(processorId);
|
||||
if (node == null) {
|
||||
|
@ -2993,7 +2997,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
writeLock.lock();
|
||||
try {
|
||||
if (initialized.get()) {
|
||||
group.startProcessor(node);
|
||||
group.startProcessor(node, failIfStopping);
|
||||
} else {
|
||||
startConnectablesAfterInitialization.add(node);
|
||||
}
|
||||
|
|
|
@ -761,7 +761,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
case RUNNING:
|
||||
// we want to run now. Make sure processor is not disabled and then start it.
|
||||
procNode.getProcessGroup().enableProcessor(procNode);
|
||||
controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier());
|
||||
controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier(), false);
|
||||
break;
|
||||
case STOPPED:
|
||||
if (procNode.getScheduledState() == ScheduledState.DISABLED) {
|
||||
|
|
|
@ -130,6 +130,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
private long runNanos = 0L;
|
||||
private volatile long yieldNanos;
|
||||
private final NiFiProperties nifiProperties;
|
||||
private volatile ScheduledState desiredState;
|
||||
|
||||
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
|
||||
// ??????? NOT any more
|
||||
|
@ -1281,68 +1282,81 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
* </p>
|
||||
*/
|
||||
@Override
|
||||
public <T extends ProcessContext & ControllerServiceLookup> void start(final ScheduledExecutorService taskScheduler,
|
||||
final long administrativeYieldMillis, final T processContext, final SchedulingAgentCallback schedulingAgentCallback) {
|
||||
public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final ProcessContext processContext,
|
||||
final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) {
|
||||
|
||||
if (!this.isValid()) {
|
||||
throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors());
|
||||
}
|
||||
final Processor processor = processorRef.get().getProcessor();
|
||||
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
|
||||
|
||||
final boolean starting;
|
||||
ScheduledState currentState;
|
||||
boolean starting;
|
||||
synchronized (this) {
|
||||
starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING);
|
||||
currentState = this.scheduledState.get();
|
||||
|
||||
if (currentState == ScheduledState.STOPPED) {
|
||||
starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING);
|
||||
if (starting) {
|
||||
desiredState = ScheduledState.RUNNING;
|
||||
}
|
||||
} else if (currentState == ScheduledState.STOPPING && !failIfStopping) {
|
||||
desiredState = ScheduledState.RUNNING;
|
||||
return;
|
||||
} else {
|
||||
starting = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (starting) { // will ensure that the Processor represented by this node can only be started once
|
||||
final Runnable startProcRunnable = new Runnable() {
|
||||
taskScheduler.execute(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback));
|
||||
} else {
|
||||
final String procName = processorRef.get().toString();
|
||||
LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState);
|
||||
procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[] {procName, currentState});
|
||||
}
|
||||
}
|
||||
|
||||
private void initiateStart(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis,
|
||||
final ProcessContext processContext, final SchedulingAgentCallback schedulingAgentCallback) {
|
||||
|
||||
final Processor processor = getProcessor();
|
||||
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
|
||||
|
||||
try {
|
||||
invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable<Void>() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
|
||||
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
|
||||
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
|
||||
} else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
|
||||
}
|
||||
scheduledState.set(ScheduledState.STOPPED);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
|
||||
procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {} seconds",
|
||||
new Object[]{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause);
|
||||
LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause);
|
||||
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
|
||||
|
||||
if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated
|
||||
taskScheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
scheduledState.set(ScheduledState.STOPPED);
|
||||
}
|
||||
public Void call() throws Exception {
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
|
||||
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
};
|
||||
taskScheduler.execute(startProcRunnable);
|
||||
} else {
|
||||
final String procName = processorRef.getClass().getSimpleName();
|
||||
LOG.warn("Can not start '" + procName
|
||||
+ "' since it's already in the process of being started or it is DISABLED - "
|
||||
+ scheduledState.get());
|
||||
procLog.warn("Can not start '" + procName
|
||||
+ "' since it's already in the process of being started or it is DISABLED - "
|
||||
+ scheduledState.get());
|
||||
});
|
||||
|
||||
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
|
||||
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
|
||||
} else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
|
||||
}
|
||||
scheduledState.set(ScheduledState.STOPPED);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
|
||||
procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {} seconds",
|
||||
new Object[]{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause);
|
||||
LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause);
|
||||
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
|
||||
|
||||
if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated
|
||||
taskScheduler.schedule(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback), administrativeYieldMillis, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
scheduledState.set(ScheduledState.STOPPED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1373,11 +1387,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
* </p>
|
||||
*/
|
||||
@Override
|
||||
public <T extends ProcessContext & ControllerServiceLookup> CompletableFuture<Void> stop(final ScheduledExecutorService scheduler,
|
||||
final T processContext, final SchedulingAgent schedulingAgent, final ScheduleState scheduleState) {
|
||||
public CompletableFuture<Void> stop(final ProcessScheduler processScheduler, final ScheduledExecutorService executor, final ProcessContext processContext,
|
||||
final SchedulingAgent schedulingAgent, final ScheduleState scheduleState) {
|
||||
|
||||
final Processor processor = processorRef.get().getProcessor();
|
||||
LOG.info("Stopping processor: " + processor.getClass());
|
||||
desiredState = ScheduledState.STOPPED;
|
||||
|
||||
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once
|
||||
|
@ -1385,7 +1400,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
|
||||
// will continue to monitor active threads, invoking OnStopped once there are no
|
||||
// active threads (with the exception of the thread performing shutdown operations)
|
||||
scheduler.execute(new Runnable() {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
@ -1407,9 +1422,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
scheduleState.decrementActiveThreadCount();
|
||||
scheduledState.set(ScheduledState.STOPPED);
|
||||
future.complete(null);
|
||||
|
||||
if (desiredState == ScheduledState.RUNNING) {
|
||||
processScheduler.startProcessor(StandardProcessorNode.this, true);
|
||||
}
|
||||
} else {
|
||||
// Not all of the active threads have finished. Try again in 100 milliseconds.
|
||||
scheduler.schedule(this, 100, TimeUnit.MILLISECONDS);
|
||||
executor.schedule(this, 100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
LOG.warn("Failed while shutting down processor " + processor, e);
|
||||
|
@ -1461,7 +1480,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
|
||||
final long onScheduleTimeout = timeoutString == null ? 60000
|
||||
: FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
|
||||
final Future<?> taskFuture = callback.invokeMonitoringTask(task);
|
||||
final Future<?> taskFuture = callback.scheduleTask(task);
|
||||
try {
|
||||
taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
|
||||
} catch (final InterruptedException e) {
|
||||
|
@ -1482,7 +1501,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
} catch (final ExecutionException e){
|
||||
throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e);
|
||||
} finally {
|
||||
callback.postMonitor();
|
||||
callback.onTaskComplete();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -297,13 +297,13 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
* @see StandardProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable)
|
||||
*/
|
||||
@Override
|
||||
public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode) {
|
||||
StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
|
||||
public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
|
||||
this.encryptor, getStateManager(procNode.getIdentifier()));
|
||||
final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode));
|
||||
|
||||
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
SchedulingAgentCallback callback = new SchedulingAgentCallback() {
|
||||
final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
|
||||
@Override
|
||||
public void trigger() {
|
||||
getSchedulingAgent(procNode).schedule(procNode, scheduleState);
|
||||
|
@ -311,19 +311,19 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<?> invokeMonitoringTask(Callable<?> task) {
|
||||
public Future<?> scheduleTask(Callable<?> task) {
|
||||
scheduleState.incrementActiveThreadCount();
|
||||
return componentMonitoringThreadPool.submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postMonitor() {
|
||||
public void onTaskComplete() {
|
||||
scheduleState.decrementActiveThreadCount();
|
||||
}
|
||||
};
|
||||
|
||||
LOG.info("Starting {}", procNode);
|
||||
procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback);
|
||||
procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback, failIfStopping);
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -341,7 +341,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
final ScheduleState state = getScheduleState(procNode);
|
||||
|
||||
LOG.info("Stopping {}", procNode);
|
||||
return procNode.stop(this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state);
|
||||
return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -285,7 +285,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
// start all of the components that are not disabled
|
||||
for (final ProcessorNode node : processors) {
|
||||
if (node.getScheduledState() != ScheduledState.DISABLED) {
|
||||
node.getProcessGroup().startProcessor(node);
|
||||
node.getProcessGroup().startProcessor(node, true);
|
||||
updated.add(node);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -320,7 +320,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
try {
|
||||
findAllProcessors().stream().filter(SCHEDULABLE_PROCESSORS).forEach(node -> {
|
||||
try {
|
||||
node.getProcessGroup().startProcessor(node);
|
||||
node.getProcessGroup().startProcessor(node, true);
|
||||
} catch (final Throwable t) {
|
||||
LOG.error("Unable to start processor {} due to {}", new Object[]{node.getIdentifier(), t});
|
||||
}
|
||||
|
@ -1092,7 +1092,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> startProcessor(final ProcessorNode processor) {
|
||||
public CompletableFuture<Void> startProcessor(final ProcessorNode processor, final boolean failIfStopping) {
|
||||
readLock.lock();
|
||||
try {
|
||||
if (getProcessor(processor.getIdentifier()) == null) {
|
||||
|
@ -1106,7 +1106,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
return scheduler.startProcessor(processor);
|
||||
return scheduler.startProcessor(processor, failIfStopping);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
|
|
@ -105,11 +105,11 @@ public class TestStandardProcessorNode {
|
|||
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null);
|
||||
final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() {
|
||||
@Override
|
||||
public void postMonitor() {
|
||||
public void onTaskComplete() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> invokeMonitoringTask(final Callable<?> task) {
|
||||
public Future<?> scheduleTask(final Callable<?> task) {
|
||||
return taskScheduler.submit(task);
|
||||
}
|
||||
|
||||
|
@ -119,7 +119,7 @@ public class TestStandardProcessorNode {
|
|||
}
|
||||
};
|
||||
|
||||
procNode.start(taskScheduler, 20000L, processContext, schedulingAgentCallback);
|
||||
procNode.start(taskScheduler, 20000L, processContext, schedulingAgentCallback, true);
|
||||
|
||||
Thread.sleep(1000L);
|
||||
assertEquals(1, processor.onScheduledCount);
|
||||
|
|
|
@ -160,7 +160,7 @@ public class TestProcessorLifecycle {
|
|||
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
|
||||
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
|
||||
}
|
||||
|
||||
|
@ -184,9 +184,9 @@ public class TestProcessorLifecycle {
|
|||
this.noop(testProcessor);
|
||||
final ProcessScheduler ps = fc.getProcessScheduler();
|
||||
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
|
||||
Thread.sleep(500);
|
||||
assertCondition(() -> testProcessor.operationNames.size() == 1);
|
||||
|
@ -302,7 +302,7 @@ public class TestProcessorLifecycle {
|
|||
@Override
|
||||
public void run() {
|
||||
LockSupport.parkNanos(random.nextInt(9000000));
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
countDownCounter.countDown();
|
||||
}
|
||||
});
|
||||
|
@ -342,7 +342,7 @@ public class TestProcessorLifecycle {
|
|||
this.longRunningOnSchedule(testProcessor, delay);
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 5000L);
|
||||
|
||||
ps.stopProcessor(testProcNode);
|
||||
|
@ -375,7 +375,7 @@ public class TestProcessorLifecycle {
|
|||
testProcessor.keepFailingOnScheduledTimes = 2;
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 10000L);
|
||||
ps.stopProcessor(testProcNode);
|
||||
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
|
||||
|
@ -404,7 +404,7 @@ public class TestProcessorLifecycle {
|
|||
testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE;
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
|
||||
ps.stopProcessor(testProcNode);
|
||||
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
|
||||
|
@ -429,7 +429,7 @@ public class TestProcessorLifecycle {
|
|||
this.blockingInterruptableOnUnschedule(testProcessor);
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
|
||||
ps.stopProcessor(testProcNode);
|
||||
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L);
|
||||
|
@ -454,7 +454,7 @@ public class TestProcessorLifecycle {
|
|||
this.blockingUninterruptableOnUnschedule(testProcessor);
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 3000L);
|
||||
ps.stopProcessor(testProcNode);
|
||||
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 4000L);
|
||||
|
@ -481,7 +481,7 @@ public class TestProcessorLifecycle {
|
|||
testProcessor.generateExceptionOnTrigger = true;
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
|
||||
ps.disableProcessor(testProcNode);
|
||||
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
|
||||
|
@ -503,7 +503,7 @@ public class TestProcessorLifecycle {
|
|||
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
|
||||
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
fail();
|
||||
}
|
||||
|
||||
|
@ -531,7 +531,7 @@ public class TestProcessorLifecycle {
|
|||
testProcessor.withService = true;
|
||||
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
fail();
|
||||
}
|
||||
|
||||
|
@ -563,7 +563,7 @@ public class TestProcessorLifecycle {
|
|||
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
ps.enableControllerService(testServiceNode);
|
||||
ps.startProcessor(testProcNode);
|
||||
ps.startProcessor(testProcNode, true);
|
||||
|
||||
Thread.sleep(500);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
|
||||
|
@ -598,8 +598,8 @@ public class TestProcessorLifecycle {
|
|||
testGroup.addConnection(connection);
|
||||
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
ps.startProcessor(testProcNodeA);
|
||||
ps.startProcessor(testProcNodeB);
|
||||
ps.startProcessor(testProcNodeA, true);
|
||||
ps.startProcessor(testProcNodeB, true);
|
||||
|
||||
try {
|
||||
testGroup.removeProcessor(testProcNodeA);
|
||||
|
|
|
@ -206,7 +206,7 @@ public class TestStandardProcessScheduler {
|
|||
procNode.setProperties(procProps);
|
||||
|
||||
scheduler.enableControllerService(service);
|
||||
scheduler.startProcessor(procNode);
|
||||
scheduler.startProcessor(procNode, true);
|
||||
|
||||
Thread.sleep(1000L);
|
||||
|
||||
|
|
|
@ -149,7 +149,7 @@ public class MockProcessGroup implements ProcessGroup {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> startProcessor(final ProcessorNode processor) {
|
||||
public CompletableFuture<Void> startProcessor(final ProcessorNode processor, final boolean failIfStopping) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
|
|
|
@ -135,7 +135,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
|
|||
final Connectable connectable = group.findLocalConnectable(componentId);
|
||||
if (ScheduledState.RUNNING.equals(state)) {
|
||||
if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
|
||||
final CompletableFuture<?> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
|
||||
final CompletableFuture<?> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
|
||||
future = CompletableFuture.allOf(future, processorFuture);
|
||||
} else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) {
|
||||
connectable.getProcessGroup().startInputPort((Port) connectable);
|
||||
|
|
|
@ -426,7 +426,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
|
|||
// perform the appropriate action
|
||||
switch (purposedScheduledState) {
|
||||
case RUNNING:
|
||||
parentGroup.startProcessor(processor);
|
||||
parentGroup.startProcessor(processor, true);
|
||||
break;
|
||||
case STOPPED:
|
||||
switch (processor.getScheduledState()) {
|
||||
|
|
Loading…
Reference in New Issue