diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 3497e12427..89e1419869 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -124,7 +124,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final AtomicReference> idToPathMap = new AtomicReference<>(); private final AtomicBoolean recoveryFinished = new AtomicBoolean(false); - private volatile boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); private volatile long firstEventTimestamp = 0L; // the following are all protected by the lock @@ -630,11 +630,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository @Override public synchronized void close() throws IOException { + this.closed.set(true); writeLock.lock(); try { logger.debug("Obtained write lock for close"); - this.closed = true; scheduledExecService.shutdownNow(); rolloverExecutor.shutdownNow(); queryExecService.shutdownNow(); @@ -652,7 +652,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } public boolean isShutdownComplete() { - return this.closed; + return this.closed.get(); } private void persistRecord(final Iterable records) { @@ -1269,6 +1269,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository + "exceeding the provenance recording rate. Slowing down flow to accommodate"); while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { + // if a shutdown happens while we are in this loop, kill the rollover thread and break + if (this.closed.get()) { + future.cancel(true); + break; + } + if (repoSize > sizeThreshold) { logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events"); purgeOldEvents(); @@ -1397,7 +1403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository */ File mergeJournals(final List journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException { logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile); - if ( this.closed ) { + if ( this.closed.get() ) { logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile); return null; }