NIFI-916 Refactoring how the closed flag is used in PersistentProvenanceRepository to ensure proper shutdown

This commit is contained in:
Bryan Bende 2015-09-01 14:34:25 -04:00
parent 72ccc252fe
commit 85534ca860
1 changed files with 10 additions and 4 deletions

View File

@ -124,7 +124,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
private final AtomicReference<SortedMap<Long, Path>> idToPathMap = new AtomicReference<>(); private final AtomicReference<SortedMap<Long, Path>> idToPathMap = new AtomicReference<>();
private final AtomicBoolean recoveryFinished = new AtomicBoolean(false); private final AtomicBoolean recoveryFinished = new AtomicBoolean(false);
private volatile boolean closed = false; private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile long firstEventTimestamp = 0L; private volatile long firstEventTimestamp = 0L;
// the following are all protected by the lock // the following are all protected by the lock
@ -630,11 +630,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
this.closed.set(true);
writeLock.lock(); writeLock.lock();
try { try {
logger.debug("Obtained write lock for close"); logger.debug("Obtained write lock for close");
this.closed = true;
scheduledExecService.shutdownNow(); scheduledExecService.shutdownNow();
rolloverExecutor.shutdownNow(); rolloverExecutor.shutdownNow();
queryExecService.shutdownNow(); queryExecService.shutdownNow();
@ -652,7 +652,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} }
public boolean isShutdownComplete() { public boolean isShutdownComplete() {
return this.closed; return this.closed.get();
} }
private void persistRecord(final Iterable<ProvenanceEventRecord> records) { private void persistRecord(final Iterable<ProvenanceEventRecord> records) {
@ -1269,6 +1269,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
+ "exceeding the provenance recording rate. Slowing down flow to accommodate"); + "exceeding the provenance recording rate. Slowing down flow to accommodate");
while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
// if a shutdown happens while we are in this loop, kill the rollover thread and break
if (this.closed.get()) {
future.cancel(true);
break;
}
if (repoSize > sizeThreshold) { if (repoSize > sizeThreshold) {
logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events"); logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events");
purgeOldEvents(); purgeOldEvents();
@ -1397,7 +1403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
*/ */
File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException { File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException {
logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile); logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile);
if ( this.closed ) { if ( this.closed.get() ) {
logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile); logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile);
return null; return null;
} }