NIFI-1464 ensured that OnUnscheduled is treated the same as OnScheduled

NIFI-1464 polished javadocs, error messages and docs
This commit is contained in:
Oleg Zhurakousky 2016-03-09 12:30:11 -05:00 committed by Mark Payne
parent 1c22f3f012
commit 59fac58c96
4 changed files with 49 additions and 38 deletions

View File

@ -71,7 +71,7 @@ public class NiFiProperties extends Properties {
public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration";
public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory";
public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration"; public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration";
public static final String PROCESSOR_START_TIMEOUT = "nifi.processor.start.timeout"; public static final String PROCESSOR_SCHEDULING_TIMEOUT = "nifi.processor.scheduling.timeout";
// content repository properties // content repository properties
public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory."; public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory.";

View File

@ -1153,7 +1153,7 @@ nifi.nar.library.directory.lib2=/nars/lib2 +
Providing three total locations, including _nifi.nar.library.directory_. Providing three total locations, including _nifi.nar.library.directory_.
|nifi.nar.working.directory|The location of the nar working directory. The default value is ./work/nar and probably should be left as is. |nifi.nar.working.directory|The location of the nar working directory. The default value is ./work/nar and probably should be left as is.
|nifi.documentation.working.directory|The documentation working directory. The default value is ./work/docs/components and probably should be left as is. |nifi.documentation.working.directory|The documentation working directory. The default value is ./work/docs/components and probably should be left as is.
|nifi.processor.start.timeout|Time (milliseconds) to wait for a Processors to start before other life-cycle operation (e.g., stop) could be invoked. Default is infinite. |nifi.processor.scheduling.timeout|Time (milliseconds) to wait for a Processor's life-cycle operation (@OnScheduled and @OnUnscheduled) to finish before other life-cycle operation (e.g., stop) could be invoked. Default is infinite.
|==== |====

View File

@ -1241,9 +1241,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override @Override
public void run() { public void run() {
try { try {
SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, getControllerServiceProvider(), final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, getControllerServiceProvider(),
StandardProcessorNode.this, processContext.getStateManager()); StandardProcessorNode.this, processContext.getStateManager());
invokeOnScheduleAsync(taskScheduler, schedulingContext); invokeTaskAsCancelableFuture(taskScheduler, new Callable<Void>() {
@SuppressWarnings("deprecation")
@Override
public Void call() throws Exception {
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, processor, schedulingContext);
return null;
}
});
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
schedulingAgentCallback.run(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle schedulingAgentCallback.run(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
@ -1302,12 +1309,20 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public <T extends ProcessContext & ControllerServiceLookup> void stop(final ScheduledExecutorService scheduler, public <T extends ProcessContext & ControllerServiceLookup> void stop(final ScheduledExecutorService scheduler,
final T processContext, final Callable<Boolean> activeThreadMonitorCallback) { final T processContext, final Callable<Boolean> activeThreadMonitorCallback) {
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once
final Runnable stopProcRunnable = new Runnable() { invokeTaskAsCancelableFuture(scheduler, new Callable<Void>() {
@Override
public Void call() throws Exception {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
return null;
}
});
// will continue to monitor active threads, invoking OnStopped once
// there are none
scheduler.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
if (activeThreadMonitorCallback.call()) { if (activeThreadMonitorCallback.call()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
scheduledState.set(ScheduledState.STOPPED); scheduledState.set(ScheduledState.STOPPED);
} else { } else {
@ -1317,8 +1332,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
LOG.warn("Failed while shutting down processor " + processor, e); LOG.warn("Failed while shutting down processor " + processor, e);
} }
} }
}; });
scheduler.execute(stopProcRunnable);
} else { } else {
/* /*
* We do compareAndSet() instead of set() to ensure that Processor * We do compareAndSet() instead of set() to ensure that Processor
@ -1333,41 +1347,35 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} }
/** /**
* Will invoke processor's methods annotated with @OnSchedule asynchronously * Will invoke lifecycle operation (OnScheduled or OnUnscheduled)
* to ensure that it could be interrupted if stop action was initiated on * asynchronously to ensure that it could be interrupted if stop action was
* the processor that may be sitting in the infinitely blocking @OnSchedule * initiated on the processor that may be infinitely blocking in such
* operation. While this approach paves the way for further enhancements * operation. While this approach paves the way for further enhancements
* related to managing processor'slife-cycle operation at the moment the * related to managing processor'slife-cycle operation at the moment the
* interrupt will not happen automatically. This is primarily to preserve * interrupt will not happen automatically. This is primarily to preserve
* the existing behavior or the NiFi where stop operation can only be * the existing behavior of the NiFi where stop operation can only be
* invoked once the processor is started. Unfortunately that could mean that * invoked once the processor is started. Unfortunately that could mean that
* the processor may be blocking indefinitely in the @Oncheduled call. To * the processor may be blocking indefinitely in lifecycle operation
* deal with that a new NiFi property has been introduced * (OnScheduled or OnUnscheduled). To deal with that a new NiFi property has
* <i>nifi.processor.start.timeout</i> which allows one to set the time (in * been introduced <i>nifi.processor.scheduling.timeout</i> which allows one
* milliseconds) of how long to wait before canceling the @OnScheduled task * to set the time (in milliseconds) of how long to wait before canceling
* allowing processor's stop sequence to proceed. The default value for this * such lifecycle operation (OnScheduled or OnUnscheduled) allowing
* property is {@link Long#MAX_VALUE}. * processor's stop sequence to proceed. The default value for this property
* is {@link Long#MAX_VALUE}.
* <p> * <p>
* NOTE: Canceling the task does not guarantee that the task will actually * NOTE: Canceling the task does not guarantee that the task will actually
* completes (successfully or otherwise), since cancellation of the task * completes (successfully or otherwise), since cancellation of the task
* will issue a simple Thread.interrupt(). However code inside * will issue a simple Thread.interrupt(). However code inside of lifecycle
* of @OnScheduled operation is written purely and will ignore thread * operation (OnScheduled or OnUnscheduled) is written purely and will
* interrupts you may end up with runaway thread which may eventually * ignore thread interrupts you may end up with runaway thread which may
* require NiFi reboot. In any event, the above explanation will be logged * eventually require NiFi reboot. In any event, the above explanation will
* (WARN) informing a user so further actions could be taken. * be logged (WARN) informing a user so further actions could be taken.
* </p> * </p>
*/ */
private void invokeOnScheduleAsync(ScheduledExecutorService taskScheduler, final SchedulingContext schedulingContext) throws ExecutionException { private void invokeTaskAsCancelableFuture(ScheduledExecutorService taskScheduler, Callable<Void> task) {
Future<Void> executionResult = taskScheduler.submit(new Callable<Void>() { Future<Void> executionResult = taskScheduler.submit(task);
@SuppressWarnings("deprecation")
@Override
public Void call() throws Exception {
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, processor, schedulingContext);
return null;
}
});
String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT); String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE
: FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
@ -1375,17 +1383,20 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS); executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName() LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName()
+ "' @OnSchedule operation to finish."); + "' lifecycle operation (OnScheduled or OnUnscheduled) to finish.");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} catch (TimeoutException e) { } catch (TimeoutException e) {
executionResult.cancel(true); executionResult.cancel(true);
LOG.warn("Timed out while waiting for the task executing @OnSchedule operation for '" LOG.warn("Timed out while waiting for lifecycle operation (OnScheduled or OnUnscheduled) of '"
+ this.processor.getClass().getSimpleName() + this.processor.getClass().getSimpleName()
+ "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " + "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not "
+ "guarantee that the task will be canceled since the code inside @OnSchedule method may " + "guarantee that the task will be canceled since the code inside current lifecycle operation (OnScheduled or OnUnscheduled) may "
+ "have been written to ignore interrupts which may result in runaway thread which could lead to more issues " + "have been written to ignore interrupts which may result in runaway thread which could lead to more issues "
+ "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '"
+ this.processor + "' that needs to be documented, reported and eventually fixed."); + this.processor + "' that needs to be documented, reported and eventually fixed.");
} catch (ExecutionException e){
throw new RuntimeException(
"Failed while executing one of processor's lifecycle tasks (OnScheduled or OnUnscheduled).", e);
} }
} }
} }

View File

@ -409,7 +409,7 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception { public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception {
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec"); NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
FlowController fc = this.buildFlowControllerForTest(); FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
@ -439,7 +439,7 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception {
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec"); NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
FlowController fc = this.buildFlowControllerForTest(); FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);