From 9dbd06623813dbaf416162227e67850452e0f124 Mon Sep 17 00:00:00 2001 From: greyp9 Date: Thu, 3 Feb 2022 10:20:55 -0500 Subject: [PATCH] NIFI-9233 - Improve reliability of system integration tests (#5733) --- .../nifi/controller/StandardFlowService.java | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 70eb8b0892..f1d7d97d77 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -129,6 +129,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private final AtomicReference saveHolder = new AtomicReference<>(null); private final ClusterCoordinator clusterCoordinator; private final RevisionManager revisionManager; + private volatile SaveReportingTask saveReportingTask; /** * listener/sender for internal cluster communication @@ -283,7 +284,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler { running.set(true); final ScheduledExecutorService newExecutor = new FlowEngine(2, "Flow Service Tasks"); - newExecutor.scheduleWithFixedDelay(new SaveReportingTask(), 0L, 500L, TimeUnit.MILLISECONDS); + saveReportingTask = new SaveReportingTask(); + newExecutor.scheduleWithFixedDelay(saveReportingTask, 0L, 500L, TimeUnit.MILLISECONDS); this.executor.set(newExecutor); if (configuredForClustering) { @@ -332,29 +334,32 @@ public class StandardFlowService implements FlowService, ProtocolHandler { logger.warn("Protocol sender/listener did not stop gracefully due to: " + ioe); } } - - final ScheduledExecutorService executorService = executor.get(); - if (executorService != null) { - if (force) { - executorService.shutdownNow(); - } else { - executorService.shutdown(); - } - - boolean graceful; - try { - graceful = executorService.awaitTermination(gracefulShutdownSeconds, TimeUnit.SECONDS); - } catch (final InterruptedException e) { - graceful = false; - } - - if (!graceful) { - logger.warn("Scheduling service did not gracefully shutdown within configured " + gracefulShutdownSeconds + " second window"); - } - } } finally { writeLock.unlock(); } + + final ScheduledExecutorService executorService = executor.get(); + if (executorService != null) { + if (force) { + executorService.shutdownNow(); + } else { + executorService.shutdown(); + } + + boolean graceful; + try { + graceful = executorService.awaitTermination(gracefulShutdownSeconds, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + graceful = false; + } + + if (!graceful) { + logger.warn("Scheduling service did not gracefully shutdown within configured " + gracefulShutdownSeconds + " second window"); + } + } + + // Ensure that our background save reporting task has a chance to run, because we've now shut down the executor, which could cause the save reporting task to get canceled. + saveReportingTask.run(); } @Override @@ -1095,7 +1100,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private class SaveReportingTask implements Runnable { @Override - public void run() { + public synchronized void run() { ClassLoader currentCl = null; final Bundle frameworkBundle = NarClassLoadersHolder.getInstance().getFrameworkBundle();