NIFI-1333 fixed FlowController shutdown deadlock. put read lock back. This closes #148

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Oleg Zhurakousky 2015-12-23 15:41:54 -05:00 committed by Matt Gilman
parent aa6bc5d505
commit f70f7e3447
1 changed files with 17 additions and 10 deletions

View File

@ -1086,7 +1086,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.shutdown = true;
stopAllProcessors();
writeLock.lock();
readLock.lock();
try {
if (isTerminated() || timerDrivenEngineRef.get().isTerminating()) {
throw new IllegalStateException("Controller already stopped or still stopping...");
@ -1099,19 +1099,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} else {
this.timerDrivenEngineRef.get().shutdown();
this.eventDrivenEngineRef.get().shutdown();
LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds + " seconds");
LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds
+ " seconds");
}
clusterTaskExecutor.shutdown();
// Trigger any processors' methods marked with @OnShutdown to be called
// Trigger any processors' methods marked with @OnShutdown to be
// called
rootGroup.shutdown();
// invoke any methods annotated with @OnShutdown on Controller Services
// invoke any methods annotated with @OnShutdown on Controller
// Services
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode,
controllerServiceProvider, null);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class,
serviceNode.getControllerServiceImplementation(), configContext);
}
}
@ -1119,7 +1124,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
final ConfigurationContext configContext = taskNode.getConfigurationContext();
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(),
configContext);
}
}
@ -1133,14 +1139,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
try {
flowFileRepository.close();
} catch (final Throwable t) {
LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[] {t});
LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[] { t });
}
if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
LOG.info("Controller has been terminated successfully.");
} else {
LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that "
+ "will take an indeterminate amount of time to stop. Might need to kill the program manually.");
+ "will take an indeterminate amount of time to stop. Might need to kill the program manually.");
}
if (externalSiteListener != null) {
@ -1166,8 +1172,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
} finally {
writeLock.unlock();
readLock.unlock();
}
}
/**