NIFI-1622: Ensure that the Nar Context Class Loader is used when calling Processor lifecycle methods

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2016-03-11 14:51:21 -05:00 committed by joewitt
parent a85c119533
commit 7400b6f7c5
1 changed files with 16 additions and 6 deletions

View File

@ -1247,14 +1247,19 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, processor, schedulingContext); try (final NarCloseable nc = NarCloseable.withNarLoader()) {
return null; ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, processor, schedulingContext);
return null;
}
} }
}); });
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
schedulingAgentCallback.run(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle schedulingAgentCallback.run(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
}
scheduledState.set(ScheduledState.STOPPED); scheduledState.set(ScheduledState.STOPPED);
} }
} catch (final Exception e) { } catch (final Exception e) {
@ -1312,8 +1317,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
invokeTaskAsCancelableFuture(scheduler, new Callable<Void>() { invokeTaskAsCancelableFuture(scheduler, new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); try (final NarCloseable nc = NarCloseable.withNarLoader()) {
return null; ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
return null;
}
} }
}); });
// will continue to monitor active threads, invoking OnStopped once // will continue to monitor active threads, invoking OnStopped once
@ -1323,7 +1330,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public void run() { public void run() {
try { try {
if (activeThreadMonitorCallback.call()) { if (activeThreadMonitorCallback.call()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
}
scheduledState.set(ScheduledState.STOPPED); scheduledState.set(ScheduledState.STOPPED);
} else { } else {
scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); scheduler.schedule(this, 100, TimeUnit.MILLISECONDS);