From 68707ce3c43f96e6a26789686c8f5bc397c6a532 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 16 Jan 2015 13:35:17 -0500 Subject: [PATCH] NIFI-4: Fixed documentation of OnScheduled and OnUnscheduled. Updated StandardProcessScheduler to invoke OnScheduled, OnUnscheduled, OnStopped methods appropriately. --- .../scheduling/StandardProcessScheduler.java | 50 +++++++++---------- .../annotation/lifecycle/OnScheduled.java | 2 +- .../annotation/lifecycle/OnUnscheduled.java | 14 ++++++ 3 files changed, 40 insertions(+), 26 deletions(-) diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 5950b4e819..e565ebc260 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -161,18 +161,21 @@ public final class StandardProcessScheduler implements ProcessScheduler { scheduleState.setScheduled(true); final Runnable startReportingTaskRunnable = new Runnable() { + @SuppressWarnings("deprecation") @Override public void run() { + // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time. while (true) { final ReportingTask reportingTask = taskNode.getReportingTask(); try { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, taskNode.getConfigurationContext()); + ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext()); } + break; } catch (final InvocationTargetException ite) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); LOG.error("", ite.getTargetException()); @@ -181,7 +184,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { } catch (final InterruptedException ie) { } } catch (final Exception e) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); try { Thread.sleep(administrativeYieldMillis); @@ -213,34 +216,31 @@ public final class StandardProcessScheduler implements ProcessScheduler { public void run() { final ConfigurationContext configurationContext = taskNode.getConfigurationContext(); - while (true) { - try { - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext); - } - break; - } catch (final InvocationTargetException ite) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", - new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); - LOG.error("", ite.getTargetException()); + try { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext); + } + } catch (final InvocationTargetException ite) { + LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); + LOG.error("", ite.getTargetException()); - try { - Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) { - } - } catch (final Exception e) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", - new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); - try { - Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) { - } + try { + Thread.sleep(administrativeYieldMillis); + } catch (final InterruptedException ie) { + } + } catch (final Exception e) { + LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); + try { + Thread.sleep(administrativeYieldMillis); + } catch (final InterruptedException ie) { } } agent.unschedule(taskNode, scheduleState); - if (scheduleState.getActiveThreadCount() == 0) { + if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext); } } diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java index 9dfd150526..a0703fa50e 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java @@ -41,7 +41,7 @@ import java.lang.annotation.Target; * *

* If using 1 argument and the component using the annotation is a Reporting Task, that argument must - * be of type {@link org.apache.nifi.reporting.ReportingContext ReportingContext}. + * be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}. *

* * If any method annotated with this annotation throws any Throwable, the framework will wait a while diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java index 68d0fe8c8a..b1dbde1bf2 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java @@ -33,6 +33,20 @@ import java.lang.annotation.Target; * threads are potentially running. To invoke a method after all threads have * finished processing, see the {@link OnStopped} annotation. *

+ * + *

+ * Methods using this annotation must take either 0 arguments or a single argument. + *

+ * + *

+ * If using 1 argument and the component using the annotation is a Processor, that argument must + * be of type {@link org.apache.nifi.processor.ProcessContext ProcessContext}. + *

+ * + *

+ * If using 1 argument and the component using the annotation is a Reporting Task, that argument must + * be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}. + *

* * @author none */