diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 7c4015ecc5..498852ad99 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -57,7 +57,6 @@ import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -1340,34 +1339,45 @@ public class FileSystemRepository implements ContentRepository { // Delete the oldest data archiveExpirationLog.debug("Deleting data based on timestamp"); - final Iterator itr = notYetExceedingThreshold.iterator(); - int counter = 0; - while (itr.hasNext()) { - final ArchiveInfo archiveInfo = itr.next(); - + int archiveFilesDeleted = 0; + long archiveBytesDeleted = 0L; + for (final ArchiveInfo archiveInfo : notYetExceedingThreshold) { try { final Path path = archiveInfo.toPath(); Files.deleteIfExists(path); containerState.decrementArchiveCount(); + archiveBytesDeleted += archiveInfo.getSize(); LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", archiveInfo.getName(), containerName); // Check if we've freed enough space every 25 files that we destroy - if (++counter % 25 == 0) { + if (++archiveFilesDeleted % 25 == 0) { if (getContainerUsableSpace(containerName) > minRequiredSpace) { // check if we can stop now LOG.debug("Finished cleaning up archive for Container {}", containerName); break; } } + + // If deleting a huge number of files, it can take a while. This may occur when users have a very large number of tiny + // FlowFiles and also have the nifi.content.claim.max.appendable.size property set to a low value. In such a case, this + // process may block processors from performing their job. As a result, we want to periodically log something to let + // users know what is going on, so that the system doesn't appear to just completely freeze up periodically. + if (archiveFilesDeleted % 25_000 == 0 && archiveFilesDeleted > 0) { + LOG.info("So far in this iteration, successfully deleted {} files ({}) from archive because the Content Repository size was exceeding the max configured size. Will continue " + + "deleting files from the archive until the usage drops below the threshold or until all {} archived files have been removed", + archiveFilesDeleted, FormatUtils.formatDataSize(archiveBytesDeleted), notYetExceedingThreshold.size()); + } } catch (final IOException ioe) { LOG.warn("Failed to delete {} from archive due to {}", archiveInfo, ioe.toString()); if (LOG.isDebugEnabled()) { LOG.warn("", ioe); } } - - itr.remove(); } + // Remove the first 'counter' elements from the list because those were removed. + notYetExceedingThreshold.subList(0, archiveFilesDeleted).clear(); + LOG.info("Successfully deleted {} files ({}) from archive", archiveFilesDeleted, FormatUtils.formatDataSize(archiveBytesDeleted)); + final long deleteOldestMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - sortRemainingMillis - deleteExpiredMillis; long oldestContainerArchive; @@ -1568,6 +1578,7 @@ public class FileSystemRepository implements ContentRepository { private final Condition condition = lock.newCondition(); private volatile long bytesUsed = 0L; + private volatile long checkUsedCutoffTimestamp = 0L; public ContainerState(final String containerName, final boolean archiveEnabled, final long backPressureBytes, final long capacity) { this.containerName = containerName; @@ -1586,19 +1597,38 @@ public class FileSystemRepository implements ContentRepository { return false; } + if (archivedFileCount.get() == 0) { + LOG.debug("Waiting to write to container {} is not required because archivedFileCount is 0", containerName); + return false; + } + long used = bytesUsed; - if (used == 0L) { + // We want to calculate the amount of free space & amount of used space if either it's not yet been calculated + // (used == 0) or if it's been at least 1 minute since the space was last checked. + final boolean calculateUsed = (used == 0L) || System.currentTimeMillis() > checkUsedCutoffTimestamp; + if (calculateUsed) { try { final long free = getContainerUsableSpace(containerName); used = capacity - free; bytesUsed = used; + + // Make sure that we check the amount of usable space again in 1 minute. + // This is important because 'bytesUsed' is set in two places: + // here, and in the DestroyExpiredArchives task. However, if there are a huge number + // of archived files, it can take longer to destroy expired archives than to create & archive data. + // As a result, we can have a race condition where that doesn't finish quickly enough and as a result + // this doesn't get updated, so the amount of data archived just grows and grows, eventually leading + // to running out of disk space. + checkUsedCutoffTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(1L); } catch (final IOException e) { + checkUsedCutoffTimestamp = 0L; + LOG.warn("Failed to determine how much disk space is available for container {}", containerName, e); return false; } } - return used >= backPressureBytes && archivedFileCount.get() > 0; + return used >= backPressureBytes; } public void waitForArchiveExpiration() { @@ -1630,12 +1660,15 @@ public class FileSystemRepository implements ContentRepository { try { final long free = getContainerUsableSpace(containerName); bytesUsed = capacity - free; + checkUsedCutoffTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(1L); } catch (final Exception e) { + LOG.warn("Failed to determine how much disk space is available for container {}", containerName, e); bytesUsed = 0L; + checkUsedCutoffTimestamp = 0L; // Signal that the free space should be calculated again next time it's checked. } LOG.debug("Container {} signaled to allow Content Claim Creation", containerName); - condition.signal(); + condition.signalAll(); } finally { lock.unlock(); }