From 59fac58c96f4127b1bf6fb7a501e934692534a03 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 9 Mar 2016 12:30:11 -0500 Subject: [PATCH] NIFI-1464 ensured that OnUnscheduled is treated the same as OnScheduled NIFI-1464 polished javadocs, error messages and docs --- .../org/apache/nifi/util/NiFiProperties.java | 2 +- .../main/asciidoc/administration-guide.adoc | 2 +- .../controller/StandardProcessorNode.java | 79 +++++++++++-------- .../scheduling/TestProcessorLifecycle.java | 4 +- 4 files changed, 49 insertions(+), 38 deletions(-) diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 2be81773a6..3d05a4775b 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -71,7 +71,7 @@ public class NiFiProperties extends Properties { public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration"; - public static final String PROCESSOR_START_TIMEOUT = "nifi.processor.start.timeout"; + public static final String PROCESSOR_SCHEDULING_TIMEOUT = "nifi.processor.scheduling.timeout"; // content repository properties public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory."; diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 58fe205ad0..9ca582d662 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -1153,7 +1153,7 @@ nifi.nar.library.directory.lib2=/nars/lib2 + Providing three total locations, including _nifi.nar.library.directory_. |nifi.nar.working.directory|The location of the nar working directory. The default value is ./work/nar and probably should be left as is. |nifi.documentation.working.directory|The documentation working directory. The default value is ./work/docs/components and probably should be left as is. -|nifi.processor.start.timeout|Time (milliseconds) to wait for a Processors to start before other life-cycle operation (e.g., stop) could be invoked. Default is infinite. +|nifi.processor.scheduling.timeout|Time (milliseconds) to wait for a Processor's life-cycle operation (@OnScheduled and @OnUnscheduled) to finish before other life-cycle operation (e.g., stop) could be invoked. Default is infinite. |==== diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 2a0819cf92..85c5fe8f7a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1241,9 +1241,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void run() { try { - SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, getControllerServiceProvider(), + final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, getControllerServiceProvider(), StandardProcessorNode.this, processContext.getStateManager()); - invokeOnScheduleAsync(taskScheduler, schedulingContext); + invokeTaskAsCancelableFuture(taskScheduler, new Callable() { + @SuppressWarnings("deprecation") + @Override + public Void call() throws Exception { + ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, processor, schedulingContext); + return null; + } + }); if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { schedulingAgentCallback.run(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state @@ -1302,12 +1309,20 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public void stop(final ScheduledExecutorService scheduler, final T processContext, final Callable activeThreadMonitorCallback) { if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once - final Runnable stopProcRunnable = new Runnable() { + invokeTaskAsCancelableFuture(scheduler, new Callable() { + @Override + public Void call() throws Exception { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + return null; + } + }); + // will continue to monitor active threads, invoking OnStopped once + // there are none + scheduler.execute(new Runnable() { @Override public void run() { try { if (activeThreadMonitorCallback.call()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); scheduledState.set(ScheduledState.STOPPED); } else { @@ -1317,8 +1332,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable LOG.warn("Failed while shutting down processor " + processor, e); } } - }; - scheduler.execute(stopProcRunnable); + }); } else { /* * We do compareAndSet() instead of set() to ensure that Processor @@ -1333,41 +1347,35 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * Will invoke processor's methods annotated with @OnSchedule asynchronously - * to ensure that it could be interrupted if stop action was initiated on - * the processor that may be sitting in the infinitely blocking @OnSchedule + * Will invoke lifecycle operation (OnScheduled or OnUnscheduled) + * asynchronously to ensure that it could be interrupted if stop action was + * initiated on the processor that may be infinitely blocking in such * operation. While this approach paves the way for further enhancements * related to managing processor'slife-cycle operation at the moment the * interrupt will not happen automatically. This is primarily to preserve - * the existing behavior or the NiFi where stop operation can only be + * the existing behavior of the NiFi where stop operation can only be * invoked once the processor is started. Unfortunately that could mean that - * the processor may be blocking indefinitely in the @Oncheduled call. To - * deal with that a new NiFi property has been introduced - * nifi.processor.start.timeout which allows one to set the time (in - * milliseconds) of how long to wait before canceling the @OnScheduled task - * allowing processor's stop sequence to proceed. The default value for this - * property is {@link Long#MAX_VALUE}. + * the processor may be blocking indefinitely in lifecycle operation + * (OnScheduled or OnUnscheduled). To deal with that a new NiFi property has + * been introduced nifi.processor.scheduling.timeout which allows one + * to set the time (in milliseconds) of how long to wait before canceling + * such lifecycle operation (OnScheduled or OnUnscheduled) allowing + * processor's stop sequence to proceed. The default value for this property + * is {@link Long#MAX_VALUE}. *

* NOTE: Canceling the task does not guarantee that the task will actually * completes (successfully or otherwise), since cancellation of the task - * will issue a simple Thread.interrupt(). However code inside - * of @OnScheduled operation is written purely and will ignore thread - * interrupts you may end up with runaway thread which may eventually - * require NiFi reboot. In any event, the above explanation will be logged - * (WARN) informing a user so further actions could be taken. + * will issue a simple Thread.interrupt(). However code inside of lifecycle + * operation (OnScheduled or OnUnscheduled) is written purely and will + * ignore thread interrupts you may end up with runaway thread which may + * eventually require NiFi reboot. In any event, the above explanation will + * be logged (WARN) informing a user so further actions could be taken. *

*/ - private void invokeOnScheduleAsync(ScheduledExecutorService taskScheduler, final SchedulingContext schedulingContext) throws ExecutionException { - Future executionResult = taskScheduler.submit(new Callable() { - @SuppressWarnings("deprecation") - @Override - public Void call() throws Exception { - ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, processor, schedulingContext); - return null; - } - }); + private void invokeTaskAsCancelableFuture(ScheduledExecutorService taskScheduler, Callable task) { + Future executionResult = taskScheduler.submit(task); - String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT); + String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); @@ -1375,17 +1383,20 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName() - + "' @OnSchedule operation to finish."); + + "' lifecycle operation (OnScheduled or OnUnscheduled) to finish."); Thread.currentThread().interrupt(); } catch (TimeoutException e) { executionResult.cancel(true); - LOG.warn("Timed out while waiting for the task executing @OnSchedule operation for '" + LOG.warn("Timed out while waiting for lifecycle operation (OnScheduled or OnUnscheduled) of '" + this.processor.getClass().getSimpleName() + "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " - + "guarantee that the task will be canceled since the code inside @OnSchedule method may " + + "guarantee that the task will be canceled since the code inside current lifecycle operation (OnScheduled or OnUnscheduled) may " + "have been written to ignore interrupts which may result in runaway thread which could lead to more issues " + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" + this.processor + "' that needs to be documented, reported and eventually fixed."); + } catch (ExecutionException e){ + throw new RuntimeException( + "Failed while executing one of processor's lifecycle tasks (OnScheduled or OnUnscheduled).", e); } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index ca2e4faee1..560c4cbe8c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -409,7 +409,7 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception { - NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec"); + NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -439,7 +439,7 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { - NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec"); + NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup);