NIFI-4: Fixed documentation of OnScheduled and OnUnscheduled. Updated StandardProcessScheduler to invoke OnScheduled, OnUnscheduled, OnStopped methods appropriately.

This commit is contained in:
Mark Payne 2015-01-16 13:35:17 -05:00
parent 6b5d1a86be
commit 68707ce3c4
3 changed files with 40 additions and 26 deletions

View File

@ -161,18 +161,21 @@ public final class StandardProcessScheduler implements ProcessScheduler {
scheduleState.setScheduled(true);
final Runnable startReportingTaskRunnable = new Runnable() {
@SuppressWarnings("deprecation")
@Override
public void run() {
// Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
while (true) {
final ReportingTask reportingTask = taskNode.getReportingTask();
try {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
}
break;
} catch (final InvocationTargetException ite) {
LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
LOG.error("", ite.getTargetException());
@ -181,7 +184,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
} catch (final InterruptedException ie) {
}
} catch (final Exception e) {
LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
try {
Thread.sleep(administrativeYieldMillis);
@ -213,34 +216,31 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public void run() {
final ConfigurationContext configurationContext = taskNode.getConfigurationContext();
while (true) {
try {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
}
break;
} catch (final InvocationTargetException ite) {
LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
LOG.error("", ite.getTargetException());
try {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
}
} catch (final InvocationTargetException ite) {
LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
LOG.error("", ite.getTargetException());
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
} catch (final Exception e) {
LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
} catch (final Exception e) {
LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
}
agent.unschedule(taskNode, scheduleState);
if (scheduleState.getActiveThreadCount() == 0) {
if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
}
}

View File

@ -41,7 +41,7 @@ import java.lang.annotation.Target;
*
* <p>
* If using 1 argument and the component using the annotation is a Reporting Task, that argument must
* be of type {@link org.apache.nifi.reporting.ReportingContext ReportingContext}.
* be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}.
* </p>
*
* If any method annotated with this annotation throws any Throwable, the framework will wait a while

View File

@ -34,6 +34,20 @@ import java.lang.annotation.Target;
* finished processing, see the {@link OnStopped} annotation.
* </p>
*
* <p>
* Methods using this annotation must take either 0 arguments or a single argument.
* </p>
*
* <p>
* If using 1 argument and the component using the annotation is a Processor, that argument must
* be of type {@link org.apache.nifi.processor.ProcessContext ProcessContext}.
* </p>
*
* <p>
* If using 1 argument and the component using the annotation is a Reporting Task, that argument must
* be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}.
* </p>
*
* @author none
*/
@Documented