NIFI-6155: Ensure that any task submitted to FlowEngine catches Throwable so that the task doesn't die just die silently in the case of an unexpected error/exception

This closes #3395.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2019-03-29 09:25:10 -04:00 committed by Bryan Bende
parent 4de51fd3d5
commit 76392ee862
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 54 additions and 0 deletions

View File

@ -149,6 +149,8 @@ public class ConnectableTask {
}
public InvocationResult invoke() {
logger.trace("Triggering {}", connectable);
if (scheduleState.isTerminated()) {
return InvocationResult.DO_NOT_YIELD;
}
@ -165,12 +167,14 @@ public class ConnectableTask {
// Make sure processor has work to do.
if (!isWorkToDo()) {
logger.debug("Yielding {} because it has no work to do", connectable);
return InvocationResult.yield("No work to do");
}
if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
if (!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {
logger.debug("Yielding {} because Backpressure is Applied", connectable);
return InvocationResult.yield("Backpressure Applied");
}
}

View File

@ -20,11 +20,14 @@ import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public final class FlowEngine extends ScheduledThreadPoolExecutor {
@ -79,6 +82,53 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
super.beforeExecute(thread, runnable);
}
@Override
public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
return super.schedule(wrap(command), delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
return super.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
return super.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
return super.schedule(wrap(callable), delay, unit);
}
private Runnable wrap(final Runnable runnable) {
return new Runnable() {
@Override
public void run() {
try {
runnable.run();
} catch (final Throwable t) {
logger.error("Uncaught Exception in Runnable task", t);
}
}
};
}
private <T> Callable<T> wrap(final Callable<T> callable) {
return new Callable<T>() {
@Override
public T call() throws Exception {
try {
return callable.call();
} catch (final Throwable t) {
logger.error("Uncaught Exception in Callable task", t);
throw t;
}
}
};
}
/**
* Hook method called by the thread that executed the given runnable after execution of the runnable completed. Logs the fact of completion and any errors that might have occurred.
*