diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 6c37418c15..78135300bd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -155,7 +155,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen * execute upon successful start of the Processor */ public abstract void start(ScheduledExecutorService scheduler, - long administrativeYieldMillis, T processContext, Runnable schedulingAgentCallback); + long administrativeYieldMillis, T processContext, SchedulingAgentCallback schedulingAgentCallback); /** * Will stop the {@link Processor} represented by this {@link ProcessorNode}. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java new file mode 100644 index 0000000000..9d66e38161 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +public interface SchedulingAgentCallback { + void postMonitor(); + + Future invokeMonitoringTask(Callable task); + + void trigger(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 9a6eba595e..fa7bd30a8a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1231,41 +1231,41 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable */ @Override public void start(final ScheduledExecutorService taskScheduler, - final long administrativeYieldMillis, final T processContext, final Runnable schedulingAgentCallback) { + final long administrativeYieldMillis, final T processContext, final SchedulingAgentCallback schedulingAgentCallback) { if (!this.isValid()) { throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors()); } - + final ProcessorLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); if (this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING)) { // will ensure that the Processor represented by this node can only be started once final Runnable startProcRunnable = new Runnable() { @Override public void run() { try { - final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, getControllerServiceProvider(), - StandardProcessorNode.this, processContext.getStateManager()); - invokeTaskAsCancelableFuture(taskScheduler, new Callable() { - @SuppressWarnings("deprecation") + invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable() { @Override public Void call() throws Exception { try (final NarCloseable nc = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, processor, schedulingContext); + SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, + getControllerServiceProvider(), StandardProcessorNode.this, + processContext.getStateManager()); + ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, + org.apache.nifi.processor.annotation.OnScheduled.class, processor, + schedulingContext); return null; } } }); + if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { - schedulingAgentCallback.run(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle + schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state try (final NarCloseable nc = NarCloseable.withNarLoader()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); } - scheduledState.set(ScheduledState.STOPPED); } } catch (final Exception e) { final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; - final ProcessorLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); - procLog.error( "{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}", new Object[] { StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis + " milliseconds" }, cause); LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause); @@ -1281,7 +1281,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable }; taskScheduler.execute(startProcRunnable); } else { - LOG.warn("Can not start Processor since it's already in the process of being started or it is DISABLED"); + String procName = this.processor.getClass().getSimpleName(); + LOG.warn("Can not start '" + procName + + "' since it's already in the process of being started or it is DISABLED - " + + scheduledState.get()); + procLog.warn("Can not start '" + procName + + "' since it's already in the process of being started or it is DISABLED - " + + scheduledState.get()); } } @@ -1313,27 +1319,23 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void stop(final ScheduledExecutorService scheduler, final T processContext, final Callable activeThreadMonitorCallback) { + LOG.info("Stopping processor: " + this.processor.getClass()); if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once - invokeTaskAsCancelableFuture(scheduler, new Callable() { - @Override - public Void call() throws Exception { - try (final NarCloseable nc = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); - return null; - } - } - }); // will continue to monitor active threads, invoking OnStopped once // there are none scheduler.execute(new Runnable() { + boolean unscheduled = false; @Override public void run() { + if (!this.unscheduled){ + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + this.unscheduled = true; + } try { if (activeThreadMonitorCallback.call()) { try (final NarCloseable nc = NarCloseable.withNarLoader()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); } - scheduledState.set(ScheduledState.STOPPED); } else { scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); @@ -1382,31 +1384,32 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * be logged (WARN) informing a user so further actions could be taken. *

*/ - private void invokeTaskAsCancelableFuture(ScheduledExecutorService taskScheduler, Callable task) { - Future executionResult = taskScheduler.submit(task); - + private void invokeTaskAsCancelableFuture(SchedulingAgentCallback callback, Callable task) { String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); - long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE + long onScheduleTimeout = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); - + Future taskFuture = callback.invokeMonitoringTask(task); try { - executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS); + taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName() - + "' lifecycle operation (OnScheduled or OnUnscheduled) to finish."); + + "' lifecycle OnScheduled operation to finish."); Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e); } catch (TimeoutException e) { - executionResult.cancel(true); - LOG.warn("Timed out while waiting for lifecycle operation (OnScheduled or OnUnscheduled) of '" + taskFuture.cancel(true); + LOG.warn("Timed out while waiting for OnScheduled of '" + this.processor.getClass().getSimpleName() + "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " - + "guarantee that the task will be canceled since the code inside current lifecycle operation (OnScheduled or OnUnscheduled) may " + + "guarantee that the task will be canceled since the code inside current OnScheduled operation may " + "have been written to ignore interrupts which may result in runaway thread which could lead to more issues " + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" + this.processor + "' that needs to be documented, reported and eventually fixed."); + throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e); } catch (ExecutionException e){ - throw new RuntimeException( - "Failed while executing one of processor's lifecycle tasks (OnScheduled or OnUnscheduled).", e); + throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e); + } finally { + callback.postMonitor(); } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java index 3544dac571..8f36e1e850 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.scheduling; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.engine.FlowEngine; /** * Base implementation of the {@link SchedulingAgent} which encapsulates the @@ -33,6 +34,12 @@ import org.apache.nifi.controller.ReportingTaskNode; */ abstract class AbstractSchedulingAgent implements SchedulingAgent { + protected final FlowEngine flowEngine; + + protected AbstractSchedulingAgent(FlowEngine flowEngine) { + this.flowEngine = flowEngine; + } + @Override public void schedule(Connectable connectable, ScheduleState scheduleState) { scheduleState.setScheduled(true); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 37cab01d3b..228af707d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -55,7 +55,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class); - private final FlowEngine flowEngine; private final ControllerServiceProvider serviceProvider; private final StateManagerProvider stateManagerProvider; private final EventDrivenWorkerQueue workerQueue; @@ -70,7 +69,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider, final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) { - this.flowEngine = flowEngine; + super(flowEngine); this.serviceProvider = serviceProvider; this.stateManagerProvider = stateManagerProvider; this.workerQueue = workerQueue; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java index 31c1ca4e65..3f19d28300 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java @@ -49,16 +49,15 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { private final FlowController flowController; private final ProcessContextFactory contextFactory; - private final FlowEngine flowEngine; private final StringEncryptor encryptor; private volatile String adminYieldDuration = "1 sec"; private final Map> canceledTriggers = new HashMap<>(); public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor) { + super(flowEngine); this.flowController = flowController; this.contextFactory = contextFactory; - this.flowEngine = flowEngine; this.encryptor = enryptor; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index bbaa23b507..f7e968ee55 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -42,6 +42,7 @@ import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.SchedulingAgentCallback; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.annotation.OnConfigured; import org.apache.nifi.controller.service.ControllerServiceNode; @@ -79,7 +80,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final ConcurrentMap strategyAgentMap = new ConcurrentHashMap<>(); // thread pool for starting/stopping components - private final ScheduledExecutorService componentLifeCycleThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); + private final ScheduledExecutorService componentLifeCycleThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); + private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); private final StringEncryptor encryptor; @@ -160,6 +162,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { frameworkTaskExecutor.shutdown(); componentLifeCycleThreadPool.shutdown(); + componentMonitoringThreadPool.shutdown(); } @Override @@ -295,14 +298,27 @@ public final class StandardProcessScheduler implements ProcessScheduler { StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, this.encryptor, getStateManager(procNode.getIdentifier())); final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode)); - Runnable schedulingAgentCallback = new Runnable() { + + SchedulingAgentCallback callback = new SchedulingAgentCallback() { @Override - public void run() { + public void trigger() { getSchedulingAgent(procNode).schedule(procNode, scheduleState); heartbeater.heartbeat(); } + + @Override + public Future invokeMonitoringTask(Callable task) { + scheduleState.incrementActiveThreadCount(); + return componentMonitoringThreadPool.submit(task); + } + + @Override + public void postMonitor() { + scheduleState.decrementActiveThreadCount(); + } }; - procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, schedulingAgentCallback); + + procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback); } /** @@ -317,6 +333,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, this.encryptor, getStateManager(procNode.getIdentifier())); final ScheduleState state = getScheduleState(procNode); + procNode.stop(this.componentLifeCycleThreadPool, processContext, new Callable() { @Override public Boolean call() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index 76c413fa3e..0436e21064 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -48,15 +48,14 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { private final long noWorkYieldNanos; private final FlowController flowController; - private final FlowEngine flowEngine; private final ProcessContextFactory contextFactory; private final StringEncryptor encryptor; private volatile String adminYieldDuration = "1 sec"; public TimerDrivenSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor encryptor) { + super(flowEngine); this.flowController = flowController; - this.flowEngine = flowEngine; this.contextFactory = contextFactory; this.encryptor = encryptor;