NIFI-566: Fixed issues related to backpressure

This commit is contained in:
Mark Payne 2015-04-30 13:12:07 -04:00
parent 5aeac2ebf3
commit e30cd23fc2
1 changed files with 59 additions and 56 deletions

View File

@ -240,6 +240,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
expirationActions.add(new FileRemovalAction());
scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
scheduledExecService.scheduleWithFixedDelay(new Runnable() {
@Override
@ -255,9 +258,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}
}, 1L, 1L, TimeUnit.MINUTES);
expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
expirationActions.add(new FileRemovalAction());
}
} finally {
writeLock.unlock();
@ -825,8 +825,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
};
// If we have too much data, start aging it off
if (bytesUsed > configuration.getMaxStorageCapacity()) {
// If we have too much data (at least 90% of our max capacity), start aging it off
if (bytesUsed > configuration.getMaxStorageCapacity() * 0.9) {
Collections.sort(sortedByBasename, sortByBasenameComparator);
for (final File file : sortedByBasename) {
@ -879,15 +879,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
// Update the Map ID to Path map to not include the removed file
// we use a lock here because otherwise we have a check-then-modify between get/set on the AtomicReference.
// But we keep the AtomicReference because in other places we do just a .get()
writeLock.lock();
try {
logger.debug("Obtained write lock to update ID/Path Map for expiration");
final SortedMap<Long, Path> pathMap = idToPathMap.get();
// We cannot obtain the write lock here because there may be a need for the lock in the rollover method,
// if we have 'backpressure applied'. This would result in a deadlock because the rollover method would be
// waiting for purgeOldEvents, and purgeOldEvents would be waiting for the write lock held by rollover.
boolean updated = false;
while (!updated) {
final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
final SortedMap<Long, Path> newPathMap = new TreeMap<>(new PathMapComparator());
newPathMap.putAll(pathMap);
newPathMap.putAll(existingPathMap);
final Iterator<Map.Entry<Long, Path>> itr = newPathMap.entrySet().iterator();
while (itr.hasNext()) {
final Map.Entry<Long, Path> entry = itr.next();
@ -898,10 +898,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
itr.remove();
}
}
idToPathMap.set(newPathMap);
updated = idToPathMap.compareAndSet(existingPathMap, newPathMap);
logger.debug("After expiration, path map: {}", newPathMap);
} finally {
writeLock.unlock();
}
}
@ -961,36 +960,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
}
int journalFileCount = getJournalCount();
final int journalCountThreshold = configuration.getJournalCount() * 5;
if ( journalFileCount > journalCountThreshold ) {
logger.warn("The rate of the dataflow is exceeding the provenance recording rate."
+ "Slowing down flow to accomodate. Currently, there are {} journal files and "
+ "threshold for blocking is {}", journalFileCount, journalCountThreshold);
eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
+ "exceeding the provenance recording rate.Slowing down flow to accomodate");
while (journalFileCount > journalCountThreshold) {
try {
Thread.sleep(1000L);
} catch (final InterruptedException ie) {
}
logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
+ "to accomodate. Currently, there are {} journal files and "
+ "threshold for blocking is {}", journalFileCount, journalCountThreshold);
journalFileCount = getJournalCount();
}
logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of "
+ "journal files to be rolled over is {}", journalFileCount);
}
writers = createWriters(configuration, idGenerator.get());
streamStartTime.set(System.currentTimeMillis());
recordsWrittenSinceRollover.getAndSet(0);
final long storageDirIdx = storageDirectoryIndex.getAndIncrement();
final List<File> storageDirs = configuration.getStorageDirectories();
final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size()));
@ -1019,18 +988,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final File file = fileRolledOver;
// update our map of id to Path
// need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a
// get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying
// it at one time
writeLock.lock();
try {
final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
// We need to make sure that another thread doesn't also update the map at the same time. We cannot
// use the write lock when purging old events, and we want to use the same approach here.
boolean updated = false;
final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
while (!updated) {
final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
newIdToPathMap.putAll(idToPathMap.get());
newIdToPathMap.putAll(existingPathMap);
newIdToPathMap.put(fileFirstEventId, file.toPath());
idToPathMap.set(newIdToPathMap);
} finally {
writeLock.unlock();
updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
}
logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
@ -1060,6 +1027,42 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
streamStartTime.set(System.currentTimeMillis());
bytesWrittenSinceRollover.set(0);
// We don't want to create new 'writers' until the number of unmerged journals falls below our threshold. So we wait
// here before we repopulate the 'writers' member variable and release the lock.
int journalFileCount = getJournalCount();
long repoSize = getSize(getLogFiles(), 0L);
final int journalCountThreshold = configuration.getJournalCount() * 5;
final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max capacity
if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
+ "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and "
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
+ "exceeding the provenance recording rate. Slowing down flow to accomodate");
while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
try {
Thread.sleep(1000L);
} catch (final InterruptedException ie) {
}
logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
+ "to accomodate. Currently, there are {} journal files ({} bytes) and "
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
journalFileCount = getJournalCount();
repoSize = getSize(getLogFiles(), 0L);
}
logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of "
+ "journal files to be rolled over is {}", journalFileCount);
}
writers = createWriters(configuration, idGenerator.get());
streamStartTime.set(System.currentTimeMillis());
recordsWrittenSinceRollover.getAndSet(0);
}
}