NIFI-9233 - Improve reliability of system integration tests (#5733)

This commit is contained in:
greyp9 2022-02-03 10:20:55 -05:00 committed by GitHub
parent 11bad7b4ed
commit 9dbd066238
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -129,6 +129,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
private final AtomicReference<SaveHolder> saveHolder = new AtomicReference<>(null);
private final ClusterCoordinator clusterCoordinator;
private final RevisionManager revisionManager;
private volatile SaveReportingTask saveReportingTask;
/**
* listener/sender for internal cluster communication
@ -283,7 +284,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
running.set(true);
final ScheduledExecutorService newExecutor = new FlowEngine(2, "Flow Service Tasks");
newExecutor.scheduleWithFixedDelay(new SaveReportingTask(), 0L, 500L, TimeUnit.MILLISECONDS);
saveReportingTask = new SaveReportingTask();
newExecutor.scheduleWithFixedDelay(saveReportingTask, 0L, 500L, TimeUnit.MILLISECONDS);
this.executor.set(newExecutor);
if (configuredForClustering) {
@ -332,29 +334,32 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
logger.warn("Protocol sender/listener did not stop gracefully due to: " + ioe);
}
}
final ScheduledExecutorService executorService = executor.get();
if (executorService != null) {
if (force) {
executorService.shutdownNow();
} else {
executorService.shutdown();
}
boolean graceful;
try {
graceful = executorService.awaitTermination(gracefulShutdownSeconds, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
graceful = false;
}
if (!graceful) {
logger.warn("Scheduling service did not gracefully shutdown within configured " + gracefulShutdownSeconds + " second window");
}
}
} finally {
writeLock.unlock();
}
final ScheduledExecutorService executorService = executor.get();
if (executorService != null) {
if (force) {
executorService.shutdownNow();
} else {
executorService.shutdown();
}
boolean graceful;
try {
graceful = executorService.awaitTermination(gracefulShutdownSeconds, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
graceful = false;
}
if (!graceful) {
logger.warn("Scheduling service did not gracefully shutdown within configured " + gracefulShutdownSeconds + " second window");
}
}
// Ensure that our background save reporting task has a chance to run, because we've now shut down the executor, which could cause the save reporting task to get canceled.
saveReportingTask.run();
}
@Override
@ -1095,7 +1100,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
private class SaveReportingTask implements Runnable {
@Override
public void run() {
public synchronized void run() {
ClassLoader currentCl = null;
final Bundle frameworkBundle = NarClassLoadersHolder.getInstance().getFrameworkBundle();