diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 6e05535283..27f2cbbbc7 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -240,6 +240,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS); + expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager)); + expirationActions.add(new FileRemovalAction()); + scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS); scheduledExecService.scheduleWithFixedDelay(new Runnable() { @Override @@ -255,9 +258,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } }, 1L, 1L, TimeUnit.MINUTES); - - expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager)); - expirationActions.add(new FileRemovalAction()); } } finally { writeLock.unlock(); @@ -825,8 +825,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } }; - // If we have too much data, start aging it off - if (bytesUsed > configuration.getMaxStorageCapacity()) { + // If we have too much data (at least 90% of our max capacity), start aging it off + if (bytesUsed > configuration.getMaxStorageCapacity() * 0.9) { Collections.sort(sortedByBasename, sortByBasenameComparator); for (final File file : sortedByBasename) { @@ -879,15 +879,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } // Update the Map ID to Path map to not include the removed file - // we use a lock here because otherwise we have a check-then-modify between get/set on the AtomicReference. - // But we keep the AtomicReference because in other places we do just a .get() - writeLock.lock(); - try { - logger.debug("Obtained write lock to update ID/Path Map for expiration"); - - final SortedMap pathMap = idToPathMap.get(); + // We cannot obtain the write lock here because there may be a need for the lock in the rollover method, + // if we have 'backpressure applied'. This would result in a deadlock because the rollover method would be + // waiting for purgeOldEvents, and purgeOldEvents would be waiting for the write lock held by rollover. + boolean updated = false; + while (!updated) { + final SortedMap existingPathMap = idToPathMap.get(); final SortedMap newPathMap = new TreeMap<>(new PathMapComparator()); - newPathMap.putAll(pathMap); + newPathMap.putAll(existingPathMap); + final Iterator> itr = newPathMap.entrySet().iterator(); while (itr.hasNext()) { final Map.Entry entry = itr.next(); @@ -898,10 +898,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository itr.remove(); } } - idToPathMap.set(newPathMap); + + updated = idToPathMap.compareAndSet(existingPathMap, newPathMap); logger.debug("After expiration, path map: {}", newPathMap); - } finally { - writeLock.unlock(); } } @@ -961,36 +960,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), ".")); } - int journalFileCount = getJournalCount(); - final int journalCountThreshold = configuration.getJournalCount() * 5; - if ( journalFileCount > journalCountThreshold ) { - logger.warn("The rate of the dataflow is exceeding the provenance recording rate. " - + "Slowing down flow to accomodate. Currently, there are {} journal files and " - + "threshold for blocking is {}", journalFileCount, journalCountThreshold); - eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " - + "exceeding the provenance recording rate. Slowing down flow to accomodate"); - - while (journalFileCount > journalCountThreshold) { - try { - Thread.sleep(1000L); - } catch (final InterruptedException ie) { - } - - logger.debug("Provenance Repository is still behind. Keeping flow slowed down " - + "to accomodate. Currently, there are {} journal files and " - + "threshold for blocking is {}", journalFileCount, journalCountThreshold); - - journalFileCount = getJournalCount(); - } - - logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of " - + "journal files to be rolled over is {}", journalFileCount); - } - - writers = createWriters(configuration, idGenerator.get()); - streamStartTime.set(System.currentTimeMillis()); - recordsWrittenSinceRollover.getAndSet(0); - final long storageDirIdx = storageDirectoryIndex.getAndIncrement(); final List storageDirs = configuration.getStorageDirectories(); final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size())); @@ -1019,18 +988,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final File file = fileRolledOver; // update our map of id to Path - // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a - // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying - // it at one time - writeLock.lock(); - try { - final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), ".")); + // We need to make sure that another thread doesn't also update the map at the same time. We cannot + // use the write lock when purging old events, and we want to use the same approach here. + boolean updated = false; + final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), ".")); + while (!updated) { + final SortedMap existingPathMap = idToPathMap.get(); final SortedMap newIdToPathMap = new TreeMap<>(new PathMapComparator()); - newIdToPathMap.putAll(idToPathMap.get()); + newIdToPathMap.putAll(existingPathMap); newIdToPathMap.put(fileFirstEventId, file.toPath()); - idToPathMap.set(newIdToPathMap); - } finally { - writeLock.unlock(); + updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap); } logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); @@ -1060,6 +1027,42 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository streamStartTime.set(System.currentTimeMillis()); bytesWrittenSinceRollover.set(0); + + // We don't want to create new 'writers' until the number of unmerged journals falls below our threshold. So we wait + // here before we repopulate the 'writers' member variable and release the lock. + int journalFileCount = getJournalCount(); + long repoSize = getSize(getLogFiles(), 0L); + final int journalCountThreshold = configuration.getJournalCount() * 5; + final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max capacity + + if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { + logger.warn("The rate of the dataflow is exceeding the provenance recording rate. " + + "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and " + + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); + eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " + + "exceeding the provenance recording rate. Slowing down flow to accomodate"); + + while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + + logger.debug("Provenance Repository is still behind. Keeping flow slowed down " + + "to accomodate. Currently, there are {} journal files ({} bytes) and " + + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); + + journalFileCount = getJournalCount(); + repoSize = getSize(getLogFiles(), 0L); + } + + logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of " + + "journal files to be rolled over is {}", journalFileCount); + } + + writers = createWriters(configuration, idGenerator.get()); + streamStartTime.set(System.currentTimeMillis()); + recordsWrittenSinceRollover.getAndSet(0); } }